From: Tony Tkacik Date: Fri, 24 Apr 2015 10:09:48 +0000 (+0000) Subject: Merge "Create transaction on the backend datastore only when neccessary" X-Git-Tag: release/lithium~224 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2b2517144e4eb9c17d9b41e9d9ec20d0264f5e12;hp=f3e6688eb7028378ef0863171b9e9629605f3572 Merge "Create transaction on the backend datastore only when neccessary" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 847954816c..9a916625c9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -201,14 +201,16 @@ public class SnapshotManager implements SnapshotState { LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber); + SnapshotManager.this.currentState = CREATING; + try { createSnapshotProcedure.apply(null); } catch (Exception e) { + SnapshotManager.this.currentState = IDLE; LOG.error("Error creating snapshot", e); return false; } - SnapshotManager.this.currentState = CREATING; return true; } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingNotificationAdapterModule.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingNotificationAdapterModule.java index 903cb27c92..971153bc7b 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingNotificationAdapterModule.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingNotificationAdapterModule.java @@ -9,18 +9,17 @@ package org.opendaylight.controller.config.yang.md.sal.binding.impl; import org.opendaylight.controller.config.api.DependencyResolver; import org.opendaylight.controller.config.api.ModuleIdentifier; -import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter; +import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.core.api.Broker; public class BindingNotificationAdapterModule extends AbstractBindingNotificationAdapterModule { - public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) { + public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, java.lang.AutoCloseable oldInstance) { + public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, final java.lang.AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @@ -34,7 +33,7 @@ public class BindingNotificationAdapterModule extends AbstractBindingNotificatio final BindingToNormalizedNodeCodec codec = getBindingMappingServiceDependency(); final Broker.ProviderSession session = getDomAsyncBrokerDependency().registerProvider(new DummyDOMProvider()); final DOMNotificationService notifService = session.getService(DOMNotificationService.class); - return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService, SingletonHolder.INVOKER_FACTORY); + return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService); } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/NotificationBrokerImplModule.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/NotificationBrokerImplModule.java index 415c9783da..58d5a85565 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/NotificationBrokerImplModule.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/NotificationBrokerImplModule.java @@ -7,9 +7,12 @@ */ package org.opendaylight.controller.config.yang.md.sal.binding.impl; -import com.google.common.util.concurrent.ListeningExecutorService; +import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl; + +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; -import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl; /** * @@ -17,14 +20,14 @@ import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl; public final class NotificationBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractNotificationBrokerImplModule { - public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, - org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, + final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, - org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, - NotificationBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) { + public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, + final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, + final NotificationBrokerImplModule oldModule, final java.lang.AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @@ -37,14 +40,20 @@ public final class NotificationBrokerImplModule extends @Override public java.lang.AutoCloseable createInstance() { + final NotificationPublishService notificationPublishService = getNotificationPublishAdapterDependency(); + final NotificationService notificationService = getNotificationAdapterDependency(); + + if(notificationPublishService != null & notificationService != null) { + return new HeliumNotificationProviderServiceAdapter(notificationPublishService, notificationService); + } + /* * FIXME: Switch to new broker (which has different threading model) * once this change is communicated with downstream users or * we will have adapter implementation which will honor Helium * threading model for notifications. */ - ListeningExecutorService listeningExecutor = SingletonHolder.getDefaultNotificationExecutor(); - NotificationBrokerImpl broker = new NotificationBrokerImpl(listeningExecutor); - return broker; + + return new HydrogenNotificationBrokerImpl(SingletonHolder.getDefaultNotificationExecutor()); } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AbstractNotificationListenerRegistration.java similarity index 96% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AbstractNotificationListenerRegistration.java index 5e7c91374f..540c261316 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AbstractNotificationListenerRegistration.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AggregatedNotificationListenerRegistration.java similarity index 97% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AggregatedNotificationListenerRegistration.java index f0db891f14..151439945e 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AggregatedNotificationListenerRegistration.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.yangtools.yang.binding.Notification; diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/HydrogenNotificationBrokerImpl.java similarity index 95% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/HydrogenNotificationBrokerImpl.java index 58e46ceca3..c50c4cb599 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/HydrogenNotificationBrokerImpl.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -28,15 +28,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); +public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); private final AtomicReference listeners = new AtomicReference<>(new ListenerMapGeneration()); private final ExecutorService executor; - public NotificationBrokerImpl(final ExecutorService executor) { + public HydrogenNotificationBrokerImpl(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/ListenerMapGeneration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/ListenerMapGeneration.java similarity index 98% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/ListenerMapGeneration.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/ListenerMapGeneration.java index 4d893aa7be..f59783883b 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/ListenerMapGeneration.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/ListenerMapGeneration.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import java.util.Arrays; import java.util.Collection; diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotificationListenerRegistration.java similarity index 95% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotificationListenerRegistration.java index 3dba868c6f..81183c97a1 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotificationListenerRegistration.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotifyTask.java similarity index 98% rename from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java rename to opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotifyTask.java index 2622a71e55..345ec62189 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotifyTask.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.binding.impl; +package org.opendaylight.controller.md.sal.binding.compat; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java index cdf03fa552..2a31d34d01 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java @@ -14,8 +14,6 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMService; -import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; -import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory; import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -34,7 +32,7 @@ public class BindingDOMNotificationServiceAdapter implements NotificationService private final BindingNormalizedNodeSerializer codec; private final DOMNotificationService domNotifService; - public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService, final NotificationInvokerFactory notificationInvokerFactory) { + public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService) { this.codec = codec; this.domNotifService = domNotifService; } @@ -72,8 +70,7 @@ public class BindingDOMNotificationServiceAdapter implements NotificationService protected NotificationService createInstance(final BindingToNormalizedNodeCodec codec, final ClassToInstanceMap delegates) { final DOMNotificationService domNotification = delegates.getInstance(DOMNotificationService.class); - final NotificationInvokerFactory invokerFactory = SingletonHolder.INVOKER_FACTORY; - return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification, invokerFactory); + return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification); } @Override diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang b/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang index ee130fdeeb..866cb844d1 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang +++ b/opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang @@ -217,6 +217,29 @@ module opendaylight-sal-binding-broker-impl { } } + augment "/config:modules/config:module/config:configuration" { + case binding-notification-broker { + when "/config:modules/config:module/config:type = 'binding-notification-broker'"; + container notification-adapter { + uses config:service-ref { + refine type { + mandatory false; + config:required-identity binding-new-notification-service; + } + } + } + + container notification-publish-adapter { + uses config:service-ref { + refine type { + mandatory false; + config:required-identity binding-new-notification-publish-service; + } + } + } + } + } + augment "/config:modules/config:module/config:state" { case binding-notification-broker { when "/config:modules/config:module/config:type = 'binding-notification-broker'"; diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java index 547d3498c0..2647477c0f 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java @@ -23,7 +23,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter; import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.test.util.MockSchemaService; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -77,8 +76,7 @@ public class DataBrokerTestCustomizer { } public NotificationService createNotificationService() { - return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter, - SingletonHolder.INVOKER_FACTORY); + return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter); } public NotificationPublishService createNotificationPublishService() { diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java index a439e9ea26..1203a72dc3 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java @@ -20,19 +20,27 @@ import com.google.common.util.concurrent.MoreExecutors; import javassist.ClassPool; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter; import org.opendaylight.controller.md.sal.binding.compat.HeliumRpcProviderRegistry; import org.opendaylight.controller.md.sal.binding.compat.HydrogenDataBrokerAdapter; import org.opendaylight.controller.md.sal.binding.compat.HydrogenMountProvisionServiceAdapter; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMMountPointServiceAdapter; +import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter; +import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcProviderServiceAdapter; import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcServiceAdapter; import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter; import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker; import org.opendaylight.controller.md.sal.dom.broker.impl.mount.DOMMountPointServiceImpl; @@ -41,7 +49,6 @@ import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.binding.api.mount.MountProviderService; -import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl; import org.opendaylight.controller.sal.binding.impl.RootBindingAwareBroker; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.BrokerService; @@ -65,7 +72,7 @@ public class BindingTestContext implements AutoCloseable { private RootBindingAwareBroker baBrokerImpl; - private NotificationBrokerImpl baNotifyImpl; + private HeliumNotificationProviderServiceAdapter baNotifyImpl; private BrokerImpl biBrokerImpl; @@ -93,6 +100,14 @@ public class BindingTestContext implements AutoCloseable { private BindingDOMRpcProviderServiceAdapter baProviderRpc; private DOMRpcRouter domRouter; + private NotificationPublishService publishService; + + private NotificationService listenService; + + private DOMNotificationPublishService domPublishService; + + private DOMNotificationService domListenService; + public DOMDataBroker getDomAsyncDataBroker() { @@ -249,7 +264,12 @@ public class BindingTestContext implements AutoCloseable { public void startBindingNotificationBroker() { checkState(executor != null); - baNotifyImpl = new NotificationBrokerImpl(executor); + final DOMNotificationRouter router = DOMNotificationRouter.create(16); + domPublishService = router; + domListenService = router; + publishService = new BindingDOMNotificationPublishServiceAdapter(codec, domPublishService); + listenService = new BindingDOMNotificationServiceAdapter(codec, domListenService); + baNotifyImpl = new HeliumNotificationProviderServiceAdapter(publishService,listenService); } diff --git a/opendaylight/md-sal/sal-binding-it/src/test/resources/controller.xml b/opendaylight/md-sal/sal-binding-it/src/test/resources/controller.xml index f9c4a043e5..7bfe254b17 100644 --- a/opendaylight/md-sal/sal-binding-it/src/test/resources/controller.xml +++ b/opendaylight/md-sal/sal-binding-it/src/test/resources/controller.xml @@ -76,6 +76,14 @@ prefix:binding-notification-broker binding-notification-broker + + prefix:binding-new-notification-service + binding-notification-adapter + + + prefix:binding-new-notification-publish-service + binding-notification-publish-adapter + prefix:binding-broker-impl @@ -162,10 +170,21 @@ prefix:dom-inmemory-data-broker inmemory-data-broker + dom:schema-service yang-schema-service + + + config-dom-store-spi:config-dom-datastore + config-store-service + + + + operational-dom-store-spi:operational-dom-datastore + operational-store-service + prefix:dom-broker-impl diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java new file mode 100644 index 0000000000..c3940e5256 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * Abstract base class for our internal implementation of {@link DataTreeCandidateNode}, + * which we instantiate from a serialized stream. We do not retain the before-image and + * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only + * useful for end users. Instances based on this class should never be leaked outside of + * this component. + */ +abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode { + private final ModificationType type; + + protected AbstractDataTreeCandidateNode(final ModificationType type) { + this.type = Preconditions.checkNotNull(type); + } + + @Override + public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public final ModificationType getModificationType() { + return type; + } + + @Override + public final Optional> getDataBefore() { + throw new UnsupportedOperationException("Before-image not available after serialization"); + } + + static DataTreeCandidateNode createUnmodified() { + return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + + @Override + public Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public Collection getChildNodes() { + throw new UnsupportedOperationException("Children not available after serialization"); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java new file mode 100644 index 0000000000..4b471cfa4a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ChainedCommitCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class); + private final ReadWriteShardDataTreeTransaction transaction; + private final ShardDataTreeTransactionChain chain; + private final ShardDataTreeCohort delegate; + + ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) { + this.transaction = Preconditions.checkNotNull(transaction); + this.delegate = Preconditions.checkNotNull(delegate); + this.chain = Preconditions.checkNotNull(chain); + } + + @Override + public ListenableFuture commit() { + final ListenableFuture ret = delegate.commit(); + + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(Void result) { + chain.clearTransaction(transaction); + LOG.debug("Committed transaction {}", transaction); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Transaction {} commit failed, cannot recover", transaction, t); + } + }); + + return ret; + } + + @Override + public ListenableFuture canCommit() { + return delegate.canCommit(); + } + + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + DataTreeCandidateTip getCandidate() { + return delegate.getCandidate(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java index f1f33bfc93..9a800c1659 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java @@ -7,9 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import java.util.List; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ final class ChainedTransactionProxy extends TransactionProxy { * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { return super.sendFindPrimaryShardAsync(shardName); @@ -75,7 +75,7 @@ final class ChainedTransactionProxy extends TransactionProxy { previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = akka.dispatch.Futures.promise(); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java new file mode 100644 index 0000000000..54167b2011 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.protobuf.GeneratedMessage.GeneratedExtension; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataTreeCandidatePayload extends Payload implements Externalizable { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class); + private static final long serialVersionUID = 1L; + private static final byte DELETE = 0; + private static final byte SUBTREE_MODIFIED = 1; + private static final byte UNMODIFIED = 2; + private static final byte WRITE = 3; + + private transient byte[] serialized; + + public DataTreeCandidatePayload() { + // Required by Externalizable + } + + private DataTreeCandidatePayload(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final Collection children) throws IOException { + out.writeInt(children.size()); + for (DataTreeCandidateNode child : children) { + writeNode(writer, out, child); + } + } + + private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final DataTreeCandidateNode node) throws IOException { + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + writer.writePathArgument(node.getIdentifier()); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writer.writePathArgument(node.getIdentifier()); + writeChildren(writer, out, node.getChildNodes()); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + case UNMODIFIED: + throw new IllegalArgumentException("Unmodified candidate should never be in the payload"); + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + } + + static DataTreeCandidatePayload create(DataTreeCandidate candidate) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) { + writer.writeYangInstanceIdentifier(candidate.getRootPath()); + + final DataTreeCandidateNode node = candidate.getRootNode(); + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writeChildren(writer, out, node.getChildNodes()); + break; + case UNMODIFIED: + out.writeByte(UNMODIFIED); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + + writer.close(); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e); + } + + return new DataTreeCandidatePayload(out.toByteArray()); + } + + private static Collection readChildren(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final int size = in.readInt(); + if (size != 0) { + final Collection ret = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + final DataTreeCandidateNode child = readNode(reader, in); + if (child != null) { + ret.add(child); + } + } + return ret; + } else { + return Collections.emptyList(); + } + } + + private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final byte type = in.readByte(); + switch (type) { + case DELETE: + return DeletedDataTreeCandidateNode.create(reader.readPathArgument()); + case SUBTREE_MODIFIED: + final PathArgument identifier = reader.readPathArgument(); + final Collection children = readChildren(reader, in); + if (children.isEmpty()) { + LOG.debug("Modified node {} does not have any children, not instantiating it", identifier); + return null; + } else { + return ModifiedDataTreeCandidateNode.create(identifier, children); + } + case UNMODIFIED: + return null; + case WRITE: + return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + } + + private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException { + final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in); + final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier(); + final byte type = in.readByte(); + + final DataTreeCandidateNode rootNode; + switch (type) { + case DELETE: + rootNode = DeletedDataTreeCandidateNode.create(); + break; + case SUBTREE_MODIFIED: + rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in)); + break; + case WRITE: + rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + + return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode); + } + + DataTreeCandidate getCandidate() throws IOException { + return parseCandidate(ByteStreams.newDataInput(serialized)); + } + + @Override + @Deprecated + @SuppressWarnings("rawtypes") + public Map encode() { + return null; + } + + @Override + @Deprecated + public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) { + return null; + } + + @Override + public int size() { + return serialized.length; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte((byte)serialVersionUID); + out.writeInt(serialized.length); + out.write(serialized); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final long version = in.readByte(); + Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version); + + final int length = in.readInt(); + serialized = new byte[length]; + in.readFully(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java new file mode 100644 index 0000000000..2df380b391 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a deletion. + */ +abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private DeletedDataTreeCandidateNode() { + super(ModificationType.DELETE); + } + + static DataTreeCandidateNode create() { + return new DeletedDataTreeCandidateNode() { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier) { + return new DeletedDataTreeCandidateNode() { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + return Optional.absent(); + } + + @Override + public final Collection getChildNodes() { + // We would require the before-image to reconstruct the list of nodes which + // were deleted. + throw new UnsupportedOperationException("Children not available after serialization"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java new file mode 100644 index 0000000000..208ec33967 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a modification in + * one of its children. + */ +abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private final Collection children; + + private ModifiedDataTreeCandidateNode(final Collection children) { + super(ModificationType.SUBTREE_MODIFIED); + this.children = Preconditions.checkNotNull(children); + } + + static DataTreeCandidateNode create(final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier, final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public final Collection getChildNodes() { + return children; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index 0f3ab61041..cb17335caf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { @@ -26,7 +25,7 @@ final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTrans parent.abortTransaction(this); } - DOMStoreThreePhaseCommitCohort ready() { + ShardDataTreeCohort ready() { Preconditions.checkState(close(), "Transaction is already closed"); return parent.finishTransaction(this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b53d12c0c8..62d3259a71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,6 +63,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -291,15 +294,21 @@ public class Shard extends RaftActor { } } + private static boolean isEmptyCommit(final DataTreeCandidate candidate) { + return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType()); + } + void continueCommit(final CohortEntry cohortEntry) throws Exception { + final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate(); + // If we do not have any followers and we are not using persistence // or if cohortEntry has no modifications // we can apply modification to the state immediately - if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ - applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification()); + if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { + applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); } else { Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), - new ModificationPayload(cohortEntry.getModification())); + DataTreeCandidatePayload.create(candidate)); } } @@ -309,12 +318,37 @@ public class Shard extends RaftActor { } } + private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) { + LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); + + try { + // We block on the future here so we don't have to worry about possibly accessing our + // state on a different thread outside of our dispatcher. Also, the data store + // currently uses a same thread executor anyway. + cohortEntry.getCohort().commit().get(); + + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); + + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); + + } catch (Exception e) { + sender.tell(new akka.actor.Status.Failure(e), getSelf()); + + LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), + transactionID, e); + shardMBean.incrementFailedTransactionsCount(); + } finally { + commitCoordinator.currentTransactionComplete(transactionID, true); + } + } + private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) { // With persistence enabled, this method is called via applyState by the leader strategy // after the commit has been replicated to a majority of the followers. CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if(cohortEntry == null) { + if (cohortEntry == null) { // The transaction is no longer the current commit. This can happen if the transaction // was aborted prior, most likely due to timeout in the front-end. We need to finish // committing the transaction though since it was successfully persisted and replicated @@ -323,7 +357,13 @@ public class Shard extends RaftActor { // transaction. cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { - commitWithNewTransaction(cohortEntry.getModification()); + try { + store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate()); + } catch (DataValidationFailedException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); + } + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication @@ -334,31 +374,8 @@ public class Shard extends RaftActor { LOG.error(ex.getMessage()); sender.tell(new akka.actor.Status.Failure(ex), getSelf()); } - - return; - } - - LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - - try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().commit().get(); - - sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); - - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - - } catch (Exception e) { - sender.tell(new akka.actor.Status.Failure(e), getSelf()); - - LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), - transactionID, e); - shardMBean.incrementFailedTransactionsCount(); - } finally { - commitCoordinator.currentTransactionComplete(transactionID, true); + } else { + finishCommit(sender, transactionID, cohortEntry); } } @@ -556,15 +573,25 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - - if(data instanceof ModificationPayload) { + if (data instanceof DataTreeCandidatePayload) { + if (clientActor == null) { + // No clientActor indicates a replica coming from the leader + try { + store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate()); + } catch (DataValidationFailedException | IOException e) { + LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); + } + } else { + // Replication consensus reached, proceed to commit + finishCommit(clientActor, identifier); + } + } else if (data instanceof ModificationPayload) { try { applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); } - } - else if (data instanceof CompositeModificationPayload) { + } else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 0eb48fd180..30947fa666 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; /** @@ -42,7 +41,7 @@ public class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); } private final Cache cohortCache; @@ -413,8 +412,7 @@ public class ShardCommitCoordinator { static class CohortEntry { private final String transactionID; - private DOMStoreThreePhaseCommitCohort cohort; - private final MutableCompositeModification compositeModification; + private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; @@ -422,16 +420,14 @@ public class ShardCommitCoordinator { private boolean doImmediateCommit; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { - this.compositeModification = new MutableCompositeModification(); this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } - CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, + CohortEntry(String transactionID, ShardDataTreeCohort cohort, MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.compositeModification = compositeModification; this.transaction = null; } @@ -447,17 +443,12 @@ public class ShardCommitCoordinator { return transactionID; } - DOMStoreThreePhaseCommitCohort getCohort() { + ShardDataTreeCohort getCohort() { return cohort; } - MutableCompositeModification getModification() { - return compositeModification; - } - void applyModifications(Iterable modifications) { - for(Modification modification: modifications) { - compositeModification.addModification(modification); + for (Modification modification : modifications) { modification.apply(transaction.getSnapshot()); } } @@ -498,9 +489,5 @@ public class ShardCommitCoordinator { void setShard(Shard shard) { this.shard = shard; } - - boolean hasModifications(){ - return compositeModification.getModifications().size() > 0; - } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 373bf499e0..56c5eb65bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -21,14 +21,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -93,7 +92,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(chainId).newReadWriteTransaction(txId); } - void notifyListeners(final DataTreeCandidateTip candidate) { + void notifyListeners(final DataTreeCandidate candidate) { LOG.debug("Notifying listeners on candidate {}", candidate); // DataTreeChanges first, as they are more light-weight @@ -116,7 +115,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { if (chain != null) { chain.close(); } else { - LOG.warn("Closing non-existent transaction chain {}", transactionChainId); + LOG.debug("Closing non-existent transaction chain {}", transactionChainId); } } @@ -152,15 +151,30 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return new SimpleEntry<>(reg, event); } + void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { + LOG.debug("Applying foreign transaction {}", identifier); + + final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + DataTreeCandidates.applyToModification(mod, foreign); + mod.ready(); + + LOG.trace("Applying foreign modification {}", mod); + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); + notifyListeners(candidate); + } + @Override void abortTransaction(final AbstractShardDataTreeTransaction transaction) { // Intentional no-op } @Override - DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return new ShardDataTreeCohort(this, snapshot); + return new SimpleShardDataTreeCohort(this, snapshot); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 11b3ca8ed7..213e36a570 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -7,72 +7,23 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort { - private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class); - private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); - private final DataTreeModification transaction; - private final ShardDataTree dataTree; - private DataTreeCandidateTip candidate; - - ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - } - - @Override - public ListenableFuture canCommit() { - try { - dataTree.getDataTree().validate(transaction); - LOG.debug("Transaction {} validated", transaction); - return TRUE_FUTURE; - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.getDataTree().prepare(transaction); - /* - * FIXME: this is the place where we should be interacting with persistence, specifically by invoking - * persist on the candidate (which gives us a Future). - */ - LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); - return VOID_FUTURE; - } catch (Exception e) { - LOG.debug("Transaction {} failed to prepare", transaction, e); - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture abort() { - // No-op, really - return VOID_FUTURE; +public abstract class ShardDataTreeCohort { + ShardDataTreeCohort() { + // Prevent foreign instantiation } - @Override - public ListenableFuture commit() { - try { - dataTree.getDataTree().commit(candidate); - } catch (Exception e) { - LOG.error("Transaction {} failed to commit", transaction, e); - return Futures.immediateFailedFuture(e); - } + abstract DataTreeCandidateTip getCandidate(); - LOG.debug("Transaction {} committed, proceeding to notify", transaction); - dataTree.notifyListeners(candidate); - return VOID_FUTURE; - } + @VisibleForTesting + public abstract ListenableFuture canCommit(); + @VisibleForTesting + public abstract ListenableFuture preCommit(); + @VisibleForTesting + public abstract ListenableFuture abort(); + @VisibleForTesting + public abstract ListenableFuture commit(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 780d940128..183c2192e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -9,12 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,17 +72,17 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); // dataTree is finalizing ready the transaction, we just record it for the next // transaction in chain - final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction); + final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction); openTransaction = null; previousTx = transaction; LOG.debug("Committing transaction {}", transaction); - return new CommitCohort(transaction, delegate); + return new ChainedCommitCohort(this, transaction, delegate); } @Override @@ -95,40 +90,9 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent return MoreObjects.toStringHelper(this).add("id", chainId).toString(); } - private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort { - private final ReadWriteShardDataTreeTransaction transaction; - private final DOMStoreThreePhaseCommitCohort delegate; - - CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) { - this.transaction = Preconditions.checkNotNull(transaction); - this.delegate = Preconditions.checkNotNull(delegate); - } - - @Override - protected DOMStoreThreePhaseCommitCohort delegate() { - return delegate; - } - - @Override - public ListenableFuture commit() { - final ListenableFuture ret = super.commit(); - - Futures.addCallback(ret, new FutureCallback() { - @Override - public void onSuccess(Void result) { - if (transaction.equals(previousTx)) { - previousTx = null; - } - LOG.debug("Committed transaction {}", transaction); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Transaction {} commit failed, cannot recover", transaction, t); - } - }); - - return ret; + void clearTransaction(ReadWriteShardDataTreeTransaction transaction) { + if (transaction.equals(previousTx)) { + previousTx = null; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 6cc1408eae..ee04aff515 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; - abstract class ShardDataTreeTransactionParent { abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); - abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); + abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index f9d3050015..797641978d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.Lists; import java.io.IOException; -import java.util.List; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; @@ -17,10 +15,10 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; @@ -36,52 +34,55 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build(); - private final ShardDataTree store; - private List currentLogRecoveryBatch; + private final DataTree store; private final String shardName; private final Logger log; + private DataTreeModification transaction; + private int size; ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) { - this.store = store; + this.store = store.getDataTree(); this.shardName = shardName; this.log = log; } @Override public void startLogRecoveryBatch(int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); + transaction = store.takeSnapshot().newModification(); + size = 0; } @Override public void appendRecoveredLogEntry(Payload payload) { try { - if(payload instanceof ModificationPayload) { - currentLogRecoveryBatch.add((ModificationPayload) payload); + if (payload instanceof DataTreeCandidatePayload) { + DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate()); + size++; + } else if (payload instanceof ModificationPayload) { + MutableCompositeModification.fromSerializable( + ((ModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationByteStringPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction); + size++; } else { log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (IOException e) { + } catch (IOException | ClassNotFoundException e) { log.error("{}: Error extracting ModificationPayload", shardName, e); } - } - private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); - try { - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } catch (Exception e) { - log.error("{}: Failed to commit Tx on recovery", shardName, e); - } + private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException { + tx.ready(); + store.validate(tx); + store.commit(store.prepare(tx)); } /** @@ -89,21 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override public void applyCurrentLogRecoveryBatch() { - log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); - - ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null); - DataTreeModification snapshot = writeTx.getSnapshot(); - for (ModificationPayload payload : currentLogRecoveryBatch) { - try { - MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot); - } catch (Exception e) { - log.error("{}: Error extracting ModificationPayload", shardName, e); - } + log.debug("{}: Applying current log recovery batch with size {}", shardName, size); + try { + commitTransaction(transaction); + } catch (DataValidationFailedException e) { + log.error("{}: Failed to apply recovery batch", shardName, e); } - - commitTransaction(writeTx); - - currentLogRecoveryBatch = null; + transaction = null; } /** @@ -115,19 +108,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public void applyRecoverySnapshot(final byte[] snapshotBytes) { log.debug("{}: Applying recovered snapshot", shardName); - // Intentionally bypass normal transaction to side-step persistence/replication - final DataTree tree = store.getDataTree(); - DataTreeModification writeTx = tree.takeSnapshot().newModification(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - writeTx.write(ROOT, node); - writeTx.ready(); + final NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + final DataTreeModification tx = store.takeSnapshot().newModification(); + tx.write(ROOT, node); try { - tree.validate(writeTx); - tree.commit(tree.prepare(writeTx)); + commitTransaction(tx); } catch (DataValidationFailedException e) { - log.error("{}: Failed to validate recovery snapshot", shardName, e); + log.error("{}: Failed to apply recovery snapshot", shardName, e); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 35d8e922f2..600509a26b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -14,7 +14,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -86,7 +85,7 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); + ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); commitCohort.preCommit().get(); commitCohort.commit().get(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 69a696f294..365f97dd3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * @author: syedbahm @@ -197,7 +196,7 @@ public class ShardWriteTransaction extends ShardTransaction { LOG.debug("readyTransaction : {}", transactionID); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), cohort, compositeModification, returnSerialized, doImmediateCommit), getContext()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java new file mode 100644 index 0000000000..9f22ce8a73 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); + private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); + private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; + private final ShardDataTree dataTree; + private DataTreeCandidateTip candidate; + + SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.transaction = Preconditions.checkNotNull(transaction); + } + + @Override + DataTreeCandidateTip getCandidate() { + return candidate; + } + + @Override + public ListenableFuture canCommit() { + try { + dataTree.getDataTree().validate(transaction); + LOG.debug("Transaction {} validated", transaction); + return TRUE_FUTURE; + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture preCommit() { + try { + candidate = dataTree.getDataTree().prepare(transaction); + /* + * FIXME: this is the place where we should be interacting with persistence, specifically by invoking + * persist on the candidate (which gives us a Future). + */ + LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); + return VOID_FUTURE; + } catch (Exception e) { + LOG.debug("Transaction {} failed to prepare", transaction, e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture abort() { + // No-op, really + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + try { + dataTree.getDataTree().commit(candidate); + } catch (Exception e) { + LOG.error("Transaction {} failed to commit", transaction, e); + return Futures.immediateFailedFuture(e); + } + + LOG.debug("Transaction {} committed, proceeding to notify", transaction); + dataTree.notifyListeners(candidate); + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f12fdd99ea..e397ab501c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -159,7 +160,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction sendFindPrimaryShardAsync(String shardName) { + protected Future sendFindPrimaryShardAsync(String shardName) { return actorContext.findPrimaryShardAsync(shardName); } @@ -497,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); - findPrimaryFuture.onComplete(new OnComplete() { + findPrimaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { newTxFutureCallback.createTransactionContext(failure, null); } else { - newTxFutureCallback.setPrimaryShard(primaryShard); + newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); } } }, actorContext.getClientDispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index cdd7859a30..2f48ab9d1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction. @@ -17,14 +17,14 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh */ public class ForwardedReadyTransaction { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; + private final ShardDataTreeCohort cohort; private final Modification modification; private final boolean returnSerialized; private final boolean doImmediateCommit; private final short txnClientVersion; public ForwardedReadyTransaction(String transactionID, short txnClientVersion, - DOMStoreThreePhaseCommitCohort cohort, Modification modification, + ShardDataTreeCohort cohort, Modification modification, boolean returnSerialized, boolean doImmediateCommit) { this.transactionID = transactionID; this.cohort = cohort; @@ -38,7 +38,7 @@ public class ForwardedReadyTransaction { return transactionID; } - public DOMStoreThreePhaseCommitCohort getCohort() { + public ShardDataTreeCohort getCohort() { return cohort; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java new file mode 100644 index 0000000000..bbeb1aa84b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorSelection; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Local message DTO that contains information about the primary shard. + * + * @author Thomas Pantelis + */ +public class PrimaryShardInfo { + private final ActorSelection primaryShardActor; + private final Optional localShardDataTree; + + public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional localShardDataTree) { + this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + /** + * Returns an ActorSelection representing the primary shard actor. + */ + public @Nonnull ActorSelection getPrimaryShardActor() { + return primaryShardActor; + } + + /** + * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local + * to the caller. Otherwise the Optional value is absent. + */ + public @Nonnull Optional getLocalShardDataTree() { + return localShardDataTree; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 17d988005f..afa773b461 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -45,8 +45,10 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +98,7 @@ public class ActorContext { private Timeout transactionCommitOperationTimeout; private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private Cache> primaryShardActorSelectionCache; + private Cache> primaryShardInfoCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -141,7 +143,7 @@ public class ActorContext { shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() + primaryShardInfoCache = CacheBuilder.newBuilder() .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) .build(); } @@ -196,24 +198,25 @@ public class ActorContext { return schemaContext; } - public Future findPrimaryShardAsync(final String shardName) { - Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); + public Future findPrimaryShardAsync(final String shardName) { + Future ret = primaryShardInfoCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout); - return future.transform(new Mapper() { + return future.transform(new Mapper() { @Override - public ActorSelection checkedApply(Object response) throws Exception { + public PrimaryShardInfo checkedApply(Object response) throws Exception { if(response instanceof PrimaryFound) { PrimaryFound found = (PrimaryFound)response; LOG.debug("Primary found {}", found.getPrimaryPath()); ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); - primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection)); - return actorSelection; + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.absent()); + primaryShardInfoCache.put(shardName, Futures.successful(info)); + return info; } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -387,15 +390,15 @@ public class ActorContext { public void broadcast(final Object message){ for(final String shardName : configuration.getAllShardNames()){ - Future primaryFuture = findPrimaryShardAsync(shardName); - primaryFuture.onComplete(new OnComplete() { + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { + public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { if(failure != null) { LOG.warn("broadcast failed to send message {} to shard {}: {}", message.getClass().getSimpleName(), shardName, failure); } else { - primaryShard.tell(message, ActorRef.noSender()); + primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender()); } } }, getClientDispatcher()); @@ -553,7 +556,7 @@ public class ActorContext { } @VisibleForTesting - Cache> getPrimaryShardActorSelectionCache() { - return primaryShardActorSelectionCache; + Cache> getPrimaryShardInfoCache() { + return primaryShardInfoCache; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 03f2bb7ad0..1100f3a7fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -21,7 +21,6 @@ import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; @@ -42,10 +41,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -53,6 +48,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; @@ -172,49 +168,35 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } - protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - CheckedFuture>, ReadFailedException> read = - transaction.read(YangInstanceIdentifier.builder().build()); - - Optional> optional = read.checkedGet(); - - NormalizedNode normalizedNode = optional.get(); - - transaction.close(); - - return normalizedNode; - } - - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification) { return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); } - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification, - final Function> preCommit) { + final Function> preCommit) { ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); tx.getSnapshot().write(path, data); - DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); + ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); modification.addModification(new WriteModification(path, data)); return cohort; } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual) { + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual) { return createDelegatingMockCohort(cohortName, actual, null); } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual, - final Function> preCommit) { - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName); + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual, + final Function> preCommit) { + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); doAnswer(new Answer>() { @Override @@ -248,6 +230,13 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } }).when(cohort).abort(); + doAnswer(new Answer() { + @Override + public DataTreeCandidateTip answer(final InvocationOnMock invocation) { + return actual.getCandidate(); + } + }).when(cohort).getCandidate(); + return cohort; } @@ -275,7 +264,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); transaction.getSnapshot().write(id, node); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); cohort.canCommit().get(); cohort.preCommit().get(); cohort.commit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index abe7f7678c..a64a5802b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -54,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; @@ -71,6 +72,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -309,6 +311,11 @@ public abstract class AbstractTransactionProxyTest { return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD); } + protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) { + return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()), + Optional.absent())); + } + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard actor {}", actorRef); @@ -316,7 +323,7 @@ public abstract class AbstractTransactionProxyTest { doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java new file mode 100644 index 0000000000..781c3dba71 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Collection; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; + +public class DataTreeCandidatePayloadTest { + private DataTreeCandidate candidate; + + private static DataTreeCandidateNode findNode(final Collection nodes, final PathArgument arg) { + for (DataTreeCandidateNode node : nodes) { + if (arg.equals(node.getIdentifier())) { + return node; + } + } + return null; + } + + private static void assertChildrenEquals(final Collection expected, + final Collection actual) { + // Make sure all expected nodes are there + for (DataTreeCandidateNode exp : expected) { + final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier()); + assertNotNull("missing expected child", act); + assertCandidateNodeEquals(exp, act); + } + // Make sure no nodes are present which are not in the expected set + for (DataTreeCandidateNode act : actual) { + final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier()); + assertNull("unexpected child", exp); + } + } + + private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) { + assertEquals("root path", expected.getRootPath(), actual.getRootPath()); + + final DataTreeCandidateNode expRoot = expected.getRootNode(); + final DataTreeCandidateNode actRoot = expected.getRootNode(); + assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType()); + + switch (actRoot.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes()); + break; + default: + fail("Unexpect root type " + actRoot.getModificationType()); + break; + } + + assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode()); + } + + private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) { + assertEquals("child type", expected.getModificationType(), actual.getModificationType()); + assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier()); + + switch (actual.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("child data", expected.getDataAfter(), actual.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes()); + break; + default: + fail("Unexpect root type " + actual.getModificationType()); + break; + } + } + + @Before + public void setUp() { + final YangInstanceIdentifier writePath = TestModel.TEST_PATH; + final NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData); + } + + @Test + public void testCandidateSerialization() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertEquals("payload size", 141, payload.size()); + } + + @Test + public void testCandidateSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, payload.getCandidate()); + } + + @Test + public void testPayloadSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 22ce50b90d..3d28672c9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -88,11 +86,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -100,6 +96,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,6 +111,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); @Test public void testRegisterChangeListener() throws Exception { @@ -379,10 +383,25 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRecovery() throws Exception { + public void testApplyStateWithCandidatePayload() throws Exception { - // Set up the InMemorySnapshotStore. + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + DataTreeCandidatePayload.create(candidate))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); testStore.setSchemaContext(SCHEMA_CONTEXT); @@ -393,6 +412,55 @@ public class ShardTest extends AbstractShardTest { InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( SerializationUtils.serializeNormalizedNode(root), Collections.emptyList(), 0, 1, -1, -1)); + return testStore; + } + + private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException { + source.validate(mod); + final DataTreeCandidate candidate = source.prepare(mod); + source.commit(candidate); + return DataTreeCandidatePayload.create(candidate); + } + + @Test + public void testDataTreeCandidateRecovery() throws Exception { + // Set up the InMemorySnapshotStore. + final DataTree source = setupInMemorySnapshotStore(); + + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Set up the InMemoryJournal. + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + + // Add some ModificationPayload entries + for (int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + + final DataTreeModification mod = source.takeSnapshot().newModification(); + mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + payloadForModification(source, mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testModicationRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + setupInMemorySnapshotStore(); // Set up the InMemoryJournal. @@ -420,7 +488,7 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -429,7 +497,6 @@ public class ShardTest extends AbstractShardTest { return new ModificationPayload(compMod); } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -445,19 +512,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -605,12 +672,12 @@ public class ShardTest extends AbstractShardTest { }}; } - private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } - private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); @@ -631,10 +698,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -699,10 +766,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -745,7 +812,7 @@ public class ShardTest extends AbstractShardTest { } @SuppressWarnings("unchecked") - private void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", @@ -818,6 +885,8 @@ public class ShardTest extends AbstractShardTest { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); new ShardTestKit(getSystem()) {{ Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), @@ -867,7 +936,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -909,7 +978,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -945,6 +1014,25 @@ public class ShardTest extends AbstractShardTest { }}; } + private static DataTreeCandidateTip mockCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); + doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + + private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + @Test public void testCommitWhenTransactionHasNoModifications(){ // Note that persistence is enabled which would normally result in the entry getting written to the journal @@ -959,10 +1047,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1014,10 +1103,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1070,14 +1160,15 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1146,13 +1237,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1222,7 +1313,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1270,7 +1361,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1320,7 +1411,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1339,6 +1430,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1362,7 +1458,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1381,6 +1477,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1404,10 +1505,10 @@ public class ShardTest extends AbstractShardTest { ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; - Function> preCommit = - new Function>() { + Function> preCommit = + new Function>() { @Override - public ListenableFuture apply(final DOMStoreThreePhaseCommitCohort cohort) { + public ListenableFuture apply(final ShardDataTreeCohort cohort) { ListenableFuture preCommitFuture = cohort.preCommit(); // Simulate an AbortTransaction message occurring during replication, after @@ -1425,7 +1526,7 @@ public class ShardTest extends AbstractShardTest { }; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); @@ -1475,7 +1576,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -1487,7 +1588,7 @@ public class ShardTest extends AbstractShardTest { MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, listNodePath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), modification2); @@ -1546,19 +1647,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); // Ready the Tx's @@ -1620,13 +1721,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1782,29 +1883,29 @@ public class ShardTest extends AbstractShardTest { /** * This test simply verifies that the applySnapShot logic will work * @throws ReadFailedException + * @throws DataValidationFailedException */ @Test - public void testInMemoryDataStoreRestore() throws ReadFailedException { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); - - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); - DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - commitTransaction(putTransaction); + commitTransaction(store, putTransaction); - NormalizedNode expected = readStore(store); + NormalizedNode expected = readStore(store, YangInstanceIdentifier.builder().build()); - DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + DataTreeModification writeTransaction = store.takeSnapshot().newModification(); writeTransaction.delete(YangInstanceIdentifier.builder().build()); writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); - commitTransaction(writeTransaction); + commitTransaction(store, writeTransaction); - NormalizedNode actual = readStore(store); + NormalizedNode actual = readStore(store, YangInstanceIdentifier.builder().build()); assertEquals(expected, actual); } @@ -1913,15 +2014,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); - try { - future.get(); - future = commitCohort.commit(); - future.get(); - } catch (InterruptedException | ExecutionException e) { - } + private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException { + modification.ready(); + store.validate(modification); + store.commit(store.prepare(modification)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 93c6ddbe73..844feb2f47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -130,7 +130,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if (exToThrow instanceof PrimaryNotFoundException) { doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(anyString()); } @@ -209,7 +209,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( @@ -834,7 +834,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).actorSelection(shardActorRef.path().toString()); if(shardFound) { - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { doReturn(Futures.failed(new PrimaryNotFoundException("test"))) @@ -1399,7 +1399,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 96cd3e45eb..a2309be48f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -32,6 +32,7 @@ import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -57,7 +58,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -219,19 +219,19 @@ public class PreLithiumShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java index bbfff70e2d..7016ada525 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.datastore.modification; import static org.junit.Assert.assertEquals; -import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -43,8 +43,7 @@ public class ModificationPayloadTest { assertEquals("getPath", writePath, write.getPath()); assertEquals("getData", writeData, write.getData()); - ModificationPayload cloned = - (ModificationPayload) SerializationUtils.clone(payload); + ModificationPayload cloned = SerializationUtils.clone(payload); deserialized = (MutableCompositeModification) payload.getModification(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6b4f633778..bc80937897 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -42,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -411,32 +412,36 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + return Futures.successful((Object) new PrimaryFound(expPrimaryPath)); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); - ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - assertEquals(cachedSelection, actual); + assertEquals(cachedInfo, actual); // Wait for 200 Milliseconds. The cached entry should have been removed. Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); @@ -461,7 +466,7 @@ public class ActorContextTest extends AbstractActorTest{ }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -470,7 +475,7 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); } @@ -494,7 +499,7 @@ public class ActorContextTest extends AbstractActorTest{ }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); @@ -503,7 +508,7 @@ public class ActorContextTest extends AbstractActorTest{ } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 4240608036..60420dcf23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -7,54 +7,54 @@ */ package org.opendaylight.controller.md.cluster.datastore.model; +import com.google.common.io.Resources; +import java.io.IOException; import java.io.InputStream; import java.util.Collections; -import java.util.Set; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; public class TestModel { - public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", - "test"); - - public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", - "junk"); - - - public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); - public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); - public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); - public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); - public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); - public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); - public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); - private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; - - public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); - public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); - public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). - node(OUTER_LIST_QNAME).build(); - public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). - node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build(); - public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); - public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); - - - public static final InputStream getDatastoreTestInputStream() { - return getInputStream(DATASTORE_TEST_YANG); - } - - private static InputStream getInputStream(final String resourceName) { - return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG); - } - - public static SchemaContext createTestContext() { - YangParserImpl parser = new YangParserImpl(); - Set modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream())); - return parser.resolveSchemaContext(modules); - } + public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", + "test"); + + public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", + "junk"); + + + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); + public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); + public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); + public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); + public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); + public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); + public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); + private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; + + public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); + public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).build(); + public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build(); + public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); + public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); + + + public static final InputStream getDatastoreTestInputStream() { + return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG); + } + + public static SchemaContext createTestContext() { + YangParserImpl parser = new YangParserImpl(); + try { + return parser.parseSources(Collections.singleton(Resources.asByteSource(TestModel.class.getResource(DATASTORE_TEST_YANG)))); + } catch (IOException | YangSyntaxErrorException e) { + throw new ExceptionInInitializerError(e); + } + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java new file mode 100644 index 0000000000..c3038ca709 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java @@ -0,0 +1,454 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; +import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ForwardingExecutorService; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class DOMDataTreeListenerTest { + + private SchemaContext schemaContext; + private AbstractDOMDataBroker domBroker; + private ListeningExecutorService executor; + private ExecutorService futureExecutor; + private CommitExecutorService commitExecutor; + + private static final DataContainerChild OUTER_LIST = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)) + .build(); + + private static final DataContainerChild OUTER_LIST_2 = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)) + .build(); + + private static final NormalizedNode TEST_CONTAINER = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(OUTER_LIST) + .build(); + + private static final NormalizedNode TEST_CONTAINER_2 = Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(OUTER_LIST_2) + .build(); + + private static DOMDataTreeIdentifier ROOT_DATA_TREE_ID = new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + + private static DOMDataTreeIdentifier OUTER_LIST_DATA_TREE_ID = new DOMDataTreeIdentifier( + LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH); + + @Before + public void setupStore() { + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", + MoreExecutors.newDirectExecutorService()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", + MoreExecutors.newDirectExecutorService()); + schemaContext = TestModel.createTestContext(); + + operStore.onGlobalContextUpdated(schemaContext); + configStore.onGlobalContextUpdated(schemaContext); + + ImmutableMap stores = ImmutableMap.builder() // + .put(CONFIGURATION, configStore) // + .put(OPERATIONAL, operStore) // + .build(); + + commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor()); + futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB"); + executor = new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor); + domBroker = new SerializedDOMDataBroker(stores, executor); + } + + @After + public void tearDown() { + if (executor != null) { + executor.shutdownNow(); + } + + if (futureExecutor != null) { + futureExecutor.shutdownNow(); + } + } + + @Test + public void writeContainerEmptyTreeTest() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit(); + + latch.await(5, TimeUnit.SECONDS); + + assertEquals(1, listener.getReceivedChanges().size()); + final Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot); + listenerReg.close(); + } + + @Test + public void replaceContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException { + CountDownLatch latch = new CountDownLatch(2); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit().checkedGet(); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener); + writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2); + writeTx.submit(); + + latch.await(5, TimeUnit.SECONDS); + + assertEquals(2, listener.getReceivedChanges().size()); + Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot); + + changes = listener.getReceivedChanges().get(1); + assertEquals(1, changes.size()); + + candidate = changes.iterator().next(); + assertNotNull(candidate); + candidateRoot = candidate.getRootNode(); + checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.WRITE, candidateRoot); + listenerReg.close(); + } + + @Test + public void deleteContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException { + CountDownLatch latch = new CountDownLatch(2); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit().checkedGet(); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener); + + writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + writeTx.submit(); + + latch.await(5, TimeUnit.SECONDS); + + assertEquals(2, listener.getReceivedChanges().size()); + Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot); + + changes = listener.getReceivedChanges().get(1); + assertEquals(1, changes.size()); + + candidate = changes.iterator().next(); + assertNotNull(candidate); + candidateRoot = candidate.getRootNode(); + checkChange(TEST_CONTAINER, null, ModificationType.DELETE, candidateRoot); + listenerReg.close(); + } + + @Test + public void replaceChildListContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException { + CountDownLatch latch = new CountDownLatch(2); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit().checkedGet(); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener); + + writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH, OUTER_LIST_2); + writeTx.submit(); + + latch.await(5, TimeUnit.SECONDS); + + assertEquals(2, listener.getReceivedChanges().size()); + Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot); + + changes = listener.getReceivedChanges().get(1); + assertEquals(1, changes.size()); + + candidate = changes.iterator().next(); + assertNotNull(candidate); + candidateRoot = candidate.getRootNode(); + checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.SUBTREE_MODIFIED, candidateRoot); + final DataTreeCandidateNode modifiedChild = candidateRoot.getModifiedChild( + new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)); + assertNotNull(modifiedChild); + checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, modifiedChild); + listenerReg.close(); + } + + @Test + public void rootModificationChildListenerTest() throws InterruptedException, TransactionCommitFailedException { + CountDownLatch latch = new CountDownLatch(2); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit().checkedGet(); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener); + + writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2); + writeTx.submit().checkedGet(); + + latch.await(1, TimeUnit.SECONDS); + + assertEquals(2, listener.getReceivedChanges().size()); + Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot); + + changes = listener.getReceivedChanges().get(1); + assertEquals(1, changes.size()); + + candidate = changes.iterator().next(); + assertNotNull(candidate); + candidateRoot = candidate.getRootNode(); + checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, candidateRoot); + listenerReg.close(); + } + + @Test + public void listEntryChangeNonRootRegistrationTest() throws InterruptedException, TransactionCommitFailedException { + CountDownLatch latch = new CountDownLatch(2); + + DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService(); + assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!", + dataTreeChangeService); + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER); + writeTx.submit().checkedGet(); + + final TestDataTreeListener listener = new TestDataTreeListener(latch); + final ListenerRegistration listenerReg = + dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener); + + final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId1 = + new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1); + final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId2 = + new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2); + final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId3 = + new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3); + + final MapEntryNode outerListEntry1 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1); + final MapEntryNode outerListEntry2 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2); + final MapEntryNode outerListEntry3 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3); + + final MapNode listAfter = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(outerListEntry2) + .withChild(outerListEntry3) + .build(); + + writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId1)); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId2), + outerListEntry2); + writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId3), + outerListEntry3); + writeTx.submit(); + + latch.await(5, TimeUnit.SECONDS); + + assertEquals(2, listener.getReceivedChanges().size()); + Collection changes = listener.getReceivedChanges().get(0); + assertEquals(1, changes.size()); + + DataTreeCandidate candidate = changes.iterator().next(); + assertNotNull(candidate); + DataTreeCandidateNode candidateRoot = candidate.getRootNode(); + checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot); + + changes = listener.getReceivedChanges().get(1); + assertEquals(1, changes.size()); + + candidate = changes.iterator().next(); + assertNotNull(candidate); + candidateRoot = candidate.getRootNode(); + checkChange(OUTER_LIST, listAfter, ModificationType.SUBTREE_MODIFIED, candidateRoot); + final DataTreeCandidateNode entry1Canditate = candidateRoot.getModifiedChild(outerListEntryId1); + checkChange(outerListEntry1, null, ModificationType.DELETE, entry1Canditate); + final DataTreeCandidateNode entry2Canditate = candidateRoot.getModifiedChild(outerListEntryId2); + checkChange(null, outerListEntry2, ModificationType.WRITE, entry2Canditate); + final DataTreeCandidateNode entry3Canditate = candidateRoot.getModifiedChild(outerListEntryId3); + checkChange(null, outerListEntry3, ModificationType.WRITE, entry3Canditate); + listenerReg.close(); + } + + private static void checkChange(NormalizedNode expectedBefore, + NormalizedNode expectedAfter, + ModificationType expectedMod, + DataTreeCandidateNode candidateNode) { + if (expectedBefore != null) { + assertTrue(candidateNode.getDataBefore().isPresent()); + assertEquals(expectedBefore, candidateNode.getDataBefore().get()); + } else { + assertFalse(candidateNode.getDataBefore().isPresent()); + } + + if (expectedAfter != null) { + assertTrue(candidateNode.getDataAfter().isPresent()); + assertEquals(expectedAfter, candidateNode.getDataAfter().get()); + } else { + assertFalse(candidateNode.getDataAfter().isPresent()); + } + + assertEquals(expectedMod, candidateNode.getModificationType()); + } + + private DOMDataTreeChangeService getDOMDataTreeChangeService() { + final DOMDataBrokerExtension extension = domBroker.getSupportedExtensions() + .get(DOMDataTreeChangeService.class); + if (extension == null) { + return null; + } + DOMDataTreeChangeService dataTreeChangeService = null; + if (extension instanceof DOMDataTreeChangeService) { + dataTreeChangeService = (DOMDataTreeChangeService) extension; + } + return dataTreeChangeService; + } + + + static class CommitExecutorService extends ForwardingExecutorService { + + ExecutorService delegate; + + public CommitExecutorService(final ExecutorService delegate) { + this.delegate = delegate; + } + + @Override + protected ExecutorService delegate() { + return delegate; + } + } + + static class TestDataTreeListener implements DOMDataTreeChangeListener { + + private List> receivedChanges = new ArrayList<>(); + private CountDownLatch latch; + + public TestDataTreeListener(final CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onDataTreeChanged(@Nonnull final Collection changes) { + receivedChanges.add(changes); + latch.countDown(); + } + + public List> getReceivedChanges() { + return receivedChanges; + } + } +} diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java index f043d2cb84..106abca3ec 100644 --- a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java @@ -49,7 +49,8 @@ public abstract class AbstractDOMStoreTransaction implements DOMStoreTransact * @return The context in which this transaction was allocated, or null * if the context was not recorded. */ - @Nullable public final Throwable getDebugContext() { + @Nullable + public final Throwable getDebugContext() { return debugContext; } diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java new file mode 100644 index 0000000000..b7776b2a39 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.core.spi.data; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import java.util.AbstractMap.SimpleEntry; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract implementation of the {@link DOMStoreTransactionChain} interface relying on {@link DataTreeSnapshot} supplier + * and backend commit coordinator. + * + * @param transaction identifier type + */ +@Beta +public abstract class AbstractSnapshotBackedTransactionChain extends TransactionReadyPrototype implements DOMStoreTransactionChain { + private static abstract class State { + /** + * Allocate a new snapshot. + * + * @return A new snapshot + */ + protected abstract DataTreeSnapshot getSnapshot(); + } + + private static final class Idle extends State { + private final AbstractSnapshotBackedTransactionChain chain; + + Idle(final AbstractSnapshotBackedTransactionChain chain) { + this.chain = Preconditions.checkNotNull(chain); + } + + @Override + protected DataTreeSnapshot getSnapshot() { + return chain.takeSnapshot(); + } + } + + /** + * We have a transaction out there. + */ + private static final class Allocated extends State { + private static final AtomicReferenceFieldUpdater SNAPSHOT_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot"); + private final DOMStoreWriteTransaction transaction; + private volatile DataTreeSnapshot snapshot; + + Allocated(final DOMStoreWriteTransaction transaction) { + this.transaction = Preconditions.checkNotNull(transaction); + } + + public DOMStoreWriteTransaction getTransaction() { + return transaction; + } + + @Override + protected DataTreeSnapshot getSnapshot() { + final DataTreeSnapshot ret = snapshot; + Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier()); + return ret; + } + + void setSnapshot(final DataTreeSnapshot snapshot) { + final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot); + Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier()); + } + } + + /** + * Chain is logically shut down, no further allocation allowed. + */ + private static final class Shutdown extends State { + private final String message; + + Shutdown(final String message) { + this.message = Preconditions.checkNotNull(message); + } + + @Override + protected DataTreeSnapshot getSnapshot() { + throw new IllegalStateException(message); + } + } + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractSnapshotBackedTransactionChain.class, State.class, "state"); + private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotBackedTransactionChain.class); + private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed"); + private static final Shutdown FAILED = new Shutdown("Transaction chain has failed"); + private final Idle idleState; + private volatile State state; + + protected AbstractSnapshotBackedTransactionChain() { + idleState = new Idle(this); + state = idleState; + } + + private Entry getSnapshot() { + final State localState = state; + return new SimpleEntry<>(localState, localState.getSnapshot()); + } + + private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) { + final State state = new Allocated(transaction); + return STATE_UPDATER.compareAndSet(this, expected, state); + } + + @Override + public final DOMStoreReadTransaction newReadOnlyTransaction() { + final Entry entry = getSnapshot(); + return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue()); + } + + @Override + public final DOMStoreReadWriteTransaction newReadWriteTransaction() { + Entry entry; + DOMStoreReadWriteTransaction ret; + + do { + entry = getSnapshot(); + ret = new SnapshotBackedReadWriteTransaction(nextTransactionIdentifier(), + getDebugTransactions(), entry.getValue(), this); + } while (!recordTransaction(entry.getKey(), ret)); + + return ret; + } + + @Override + public final DOMStoreWriteTransaction newWriteOnlyTransaction() { + Entry entry; + DOMStoreWriteTransaction ret; + + do { + entry = getSnapshot(); + ret = new SnapshotBackedWriteTransaction(nextTransactionIdentifier(), + getDebugTransactions(), entry.getValue(), this); + } while (!recordTransaction(entry.getKey(), ret)); + + return ret; + } + + @Override + protected final void transactionAborted(final SnapshotBackedWriteTransaction tx) { + final State localState = state; + if (localState instanceof Allocated) { + final Allocated allocated = (Allocated)localState; + if (allocated.getTransaction().equals(tx)) { + final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState); + if (!success) { + LOG.warn("Transaction {} aborted, but chain {} state already transitioned from {} to {}, very strange", + tx, this, localState, state); + } + } + } + } + + @Override + protected final DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) { + final State localState = state; + + if (localState instanceof Allocated) { + final Allocated allocated = (Allocated)localState; + final DOMStoreWriteTransaction transaction = allocated.getTransaction(); + Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction); + allocated.setSnapshot(tree); + } else { + LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState); + } + + return createCohort(tx, tree); + } + + @Override + public final void close() { + final State localState = state; + + do { + Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this); + + if (FAILED.equals(localState)) { + LOG.debug("Ignoring user close in failed state"); + return; + } + } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED)); + } + + /** + * Notify the base logic that a previously-submitted transaction has been committed successfully. + * + * @param transaction Transaction which completed successfully. + */ + protected final void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) { + // If the committed transaction was the one we allocated last, + // we clear it and the ready snapshot, so the next transaction + // allocated refers to the data tree directly. + final State localState = state; + + if (!(localState instanceof Allocated)) { + // This can legally happen if the chain is shut down before the transaction was committed + // by the backend. + LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState); + return; + } + + final Allocated allocated = (Allocated)localState; + final DOMStoreWriteTransaction tx = allocated.getTransaction(); + if (!tx.equals(transaction)) { + LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated); + return; + } + + if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) { + LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state); + } + } + + /** + * Notify the base logic that a previously-submitted transaction has failed. + * + * @param transaction Transaction which failed. + * @param cause Failure cause + */ + protected final void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable cause) { + LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, cause); + state = FAILED; + } + + /** + * Return the next transaction identifier. + * + * @return transaction identifier. + */ + protected abstract T nextTransactionIdentifier(); + + /** + * Inquire as to whether transactions should record their allocation context. + * + * @return True if allocation context should be recorded. + */ + protected abstract boolean getDebugTransactions(); + + /** + * Take a fresh {@link DataTreeSnapshot} from the backend. + * + * @return A new snapshot. + */ + protected abstract DataTreeSnapshot takeSnapshot(); + + /** + * Create a cohort for driving the transaction through the commit process. + * + * @param transaction Transaction handle + * @param modification {@link DataTreeModification} which needs to be applied to the backend + * @return A {@link DOMStoreThreePhaseCommitCohort} cohort. + */ + protected abstract DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction transaction, final DataTreeModification modification); +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadTransaction.java similarity index 80% rename from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java rename to opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadTransaction.java index 8b18be432a..8e5957c71a 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadTransaction.java @@ -5,16 +5,15 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.md.sal.dom.store.impl; +package org.opendaylight.controller.sal.core.spi.data; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.Beta; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -28,14 +27,21 @@ import org.slf4j.LoggerFactory; * Implementation of read-only transaction backed by {@link DataTreeSnapshot} * which delegates most of its calls to similar methods provided by underlying snapshot. * + * identifier type */ -final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction - implements DOMStoreReadTransaction { - +@Beta +public final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction { private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class); private volatile DataTreeSnapshot stableSnapshot; - public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) { + /** + * Creates a new read-only transaction. + * + * @param identifier Transaction Identifier + * @param debug Enable transaction debugging + * @param snapshot Snapshot which will be modified. + */ + SnapshotBackedReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) { super(identifier, debug); this.stableSnapshot = Preconditions.checkNotNull(snapshot); LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot); @@ -71,8 +77,7 @@ final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction identifier type */ -final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction { +@Beta +public final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class); - /** - * Creates new read-write transaction. - * - * @param identifier transaction Identifier - * @param snapshot Snapshot which will be modified. - * @param readyImpl Implementation of ready method. - */ - protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug, - final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) { - super(identifier, debug, snapshot, store); + SnapshotBackedReadWriteTransaction(final T identifier, final boolean debug, + final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) { + super(identifier, debug, snapshot, readyImpl); } @Override diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java new file mode 100644 index 0000000000..3368c8aee1 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.core.spi.data; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; + +/** + * Public utility class for instantiating snapshot-backed transactions. + */ +@Beta +public final class SnapshotBackedTransactions { + private SnapshotBackedTransactions() { + throw new UnsupportedOperationException("Utility class"); + } + + /** + * Creates a new read-only transaction. + * + * @param identifier Transaction Identifier + * @param debug Enable transaction debugging + * @param snapshot Snapshot which will be modified. + */ + public static SnapshotBackedReadTransaction newReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) { + return new SnapshotBackedReadTransaction(identifier, debug, snapshot); + } + + /** + * Creates a new read-write transaction. + * + * @param identifier transaction Identifier + * @param debug Enable transaction debugging + * @param snapshot Snapshot which will be modified. + * @param readyImpl Implementation of ready method. + */ + public static SnapshotBackedReadWriteTransaction newReadWriteTransaction(final T identifier, final boolean debug, + final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) { + return new SnapshotBackedReadWriteTransaction(identifier, debug, snapshot, readyImpl); + } + + /** + * Creates a new write-only transaction. + * + * @param identifier transaction Identifier + * @param debug Enable transaction debugging + * @param snapshot Snapshot which will be modified. + * @param readyImpl Implementation of ready method. + */ + public static SnapshotBackedWriteTransaction newWriteTransaction(final T identifier, final boolean debug, + final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) { + return new SnapshotBackedWriteTransaction(identifier, debug, snapshot, readyImpl); + } +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedWriteTransaction.java similarity index 81% rename from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java rename to opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedWriteTransaction.java index 10e3a7df10..a02d768370 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedWriteTransaction.java @@ -5,17 +5,15 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.md.sal.dom.store.impl; +package org.opendaylight.controller.sal.core.spi.data; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.Beta; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -26,33 +24,27 @@ import org.slf4j.LoggerFactory; /** * Implementation of Write transaction which is backed by * {@link DataTreeSnapshot} and executed according to - * {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}. + * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype}. * + * @param Identifier type */ -class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction { +@Beta +public class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class); + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater READY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl"); + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater TREE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree"); // non-null when not ready - private volatile TransactionReadyPrototype readyImpl; + private volatile TransactionReadyPrototype readyImpl; // non-null when not committed/closed private volatile DataTreeModification mutableTree; - /** - * Creates new write-only transaction. - * - * @param identifier - * transaction Identifier - * @param snapshot - * Snapshot which will be modified. - * @param readyImpl - * Implementation of ready method. - */ - public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug, - final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) { + SnapshotBackedWriteTransaction(final T identifier, final boolean debug, + final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) { super(identifier, debug); this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null."); mutableTree = snapshot.newModification(); @@ -126,7 +118,7 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction * @param path Path to read * @return null if the the transaction has been closed; */ - protected final Optional> readSnapshotNode(final YangInstanceIdentifier path) { + final Optional> readSnapshotNode(final YangInstanceIdentifier path) { return readyImpl == null ? null : mutableTree.readNode(path); } @@ -136,7 +128,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction @Override public DOMStoreThreePhaseCommitCohort ready() { - final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null); + @SuppressWarnings("unchecked") + final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null); checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier()); LOG.debug("Store transaction: {} : Ready", getIdentifier()); @@ -149,7 +142,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction @Override public void close() { - final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null); + @SuppressWarnings("unchecked") + final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null); if (wasReady != null) { LOG.debug("Store transaction: {} : Closed", getIdentifier()); TREE_UPDATER.lazySet(this, null); @@ -166,21 +160,22 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction /** * Prototype implementation of - * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)} + * {@link #ready(org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction)} * * This class is intended to be implemented by Transaction factories - * responsible for allocation of {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction} and + * responsible for allocation of {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction} and * providing underlying logic for applying implementation. * + * @param identifier type */ - abstract static class TransactionReadyPrototype { + public abstract static class TransactionReadyPrototype { /** * Called when a transaction is closed without being readied. This is not invoked for * transactions which are ready. * * @param tx Transaction which got aborted. */ - protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx); + protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx); /** * Returns a commit coordinator associated with supplied transactions. @@ -193,6 +188,6 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction * Modified data tree which has been constructed. * @return DOMStoreThreePhaseCommitCohort associated with transaction */ - protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree); + protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java index 05e3d5cb26..35d891dac0 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java @@ -8,44 +8,24 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort { - private final SnapshotBackedWriteTransaction transaction; - private final DOMStoreThreePhaseCommitCohort delegate; +final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort { private final DOMStoreTransactionChainImpl txChain; - ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction, - final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) { - this.transaction = Preconditions.checkNotNull(transaction); - this.delegate = Preconditions.checkNotNull(delegate); + ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction transaction, + final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) { + super(store, transaction, modification); this.txChain = Preconditions.checkNotNull(txChain); } - @Override - protected DOMStoreThreePhaseCommitCohort delegate() { - return delegate; - } - @Override public ListenableFuture commit() { - ListenableFuture commitFuture = super.commit(); - Futures.addCallback(commitFuture, new FutureCallback() { - @Override - public void onFailure(final Throwable t) { - txChain.onTransactionFailed(transaction, t); - } - - @Override - public void onSuccess(final Void result) { - txChain.onTransactionCommited(transaction); - } - }); - return commitFuture; + ListenableFuture ret = super.commit(); + txChain.transactionCommited(getTransaction()); + return ret; } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java index 3f731cf18b..2cf79d899b 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java @@ -8,217 +8,40 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import com.google.common.base.Preconditions; -import java.util.AbstractMap.SimpleEntry; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain { - private static abstract class State { - /** - * Allocate a new snapshot. - * - * @return A new snapshot - */ - protected abstract DataTreeSnapshot getSnapshot(); - } - - private static final class Idle extends State { - private final InMemoryDOMDataStore store; - - Idle(final InMemoryDOMDataStore store) { - this.store = Preconditions.checkNotNull(store); - } - - @Override - protected DataTreeSnapshot getSnapshot() { - return store.takeSnapshot(); - } - } - - /** - * We have a transaction out there. - */ - private static final class Allocated extends State { - private static final AtomicReferenceFieldUpdater SNAPSHOT_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot"); - private final DOMStoreWriteTransaction transaction; - private volatile DataTreeSnapshot snapshot; - - Allocated(final DOMStoreWriteTransaction transaction) { - this.transaction = Preconditions.checkNotNull(transaction); - } - - public DOMStoreWriteTransaction getTransaction() { - return transaction; - } - - @Override - protected DataTreeSnapshot getSnapshot() { - final DataTreeSnapshot ret = snapshot; - Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier()); - return ret; - } - - void setSnapshot(final DataTreeSnapshot snapshot) { - final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot); - Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier()); - } - } - - /** - * Chain is logically shut down, no further allocation allowed. - */ - private static final class Shutdown extends State { - private final String message; - - Shutdown(final String message) { - this.message = Preconditions.checkNotNull(message); - } - - @Override - protected DataTreeSnapshot getSnapshot() { - throw new IllegalStateException(message); - } - } - - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state"); - private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class); - private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed"); - private static final Shutdown FAILED = new Shutdown("Transaction chain has failed"); +final class DOMStoreTransactionChainImpl extends AbstractSnapshotBackedTransactionChain { private final InMemoryDOMDataStore store; - private final Idle idleState; - private volatile State state; DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) { this.store = Preconditions.checkNotNull(store); - idleState = new Idle(store); - state = idleState; - } - - private Entry getSnapshot() { - final State localState = state; - return new SimpleEntry<>(localState, localState.getSnapshot()); - } - - private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) { - final State state = new Allocated(transaction); - return STATE_UPDATER.compareAndSet(this, expected, state); } @Override - public DOMStoreReadTransaction newReadOnlyTransaction() { - final Entry entry = getSnapshot(); - return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue()); + protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction tx, final DataTreeModification modification) { + return new ChainedTransactionCommitImpl(store, tx, modification, this); } @Override - public DOMStoreReadWriteTransaction newReadWriteTransaction() { - Entry entry; - DOMStoreReadWriteTransaction ret; - - do { - entry = getSnapshot(); - ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(), - store.getDebugTransactions(), entry.getValue(), this); - } while (!recordTransaction(entry.getKey(), ret)); - - return ret; - } - - @Override - public DOMStoreWriteTransaction newWriteOnlyTransaction() { - Entry entry; - DOMStoreWriteTransaction ret; - - do { - entry = getSnapshot(); - ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(), - store.getDebugTransactions(), entry.getValue(), this); - } while (!recordTransaction(entry.getKey(), ret)); - - return ret; + protected DataTreeSnapshot takeSnapshot() { + return store.takeSnapshot(); } @Override - protected void transactionAborted(final SnapshotBackedWriteTransaction tx) { - final State localState = state; - if (localState instanceof Allocated) { - final Allocated allocated = (Allocated)localState; - if (allocated.getTransaction().equals(tx)) { - final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState); - if (!success) { - LOG.info("State already transitioned from {} to {}", localState, state); - } - } - } + protected String nextTransactionIdentifier() { + return store.nextIdentifier(); } @Override - protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) { - final State localState = state; - - if (localState instanceof Allocated) { - final Allocated allocated = (Allocated)localState; - final DOMStoreWriteTransaction transaction = allocated.getTransaction(); - Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction); - allocated.setSnapshot(tree); - } else { - LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState); - } - - return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this); + protected boolean getDebugTransactions() { + return store.getDebugTransactions(); } - @Override - public void close() { - final State localState = state; - - do { - Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this); - - if (FAILED.equals(localState)) { - LOG.debug("Ignoring user close in failed state"); - return; - } - } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED)); - } - - void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) { - LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t); - state = FAILED; - } - - void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) { - // If the committed transaction was the one we allocated last, - // we clear it and the ready snapshot, so the next transaction - // allocated refers to the data tree directly. - final State localState = state; - - if (!(localState instanceof Allocated)) { - LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState); - return; - } - - final Allocated allocated = (Allocated)localState; - final DOMStoreWriteTransaction tx = allocated.getTransaction(); - if (!tx.equals(transaction)) { - LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated); - return; - } - - if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) { - LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state); - } + void transactionCommited(final SnapshotBackedWriteTransaction transaction) { + super.onTransactionCommited(transaction); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index 354abcf69f..a85d8ac3fb 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -7,22 +7,15 @@ */ package org.opendaylight.controller.md.sal.dom.store.impl; -import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -30,6 +23,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -38,7 +34,6 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -55,14 +50,12 @@ import org.slf4j.LoggerFactory; * * Implementation of {@link DOMStore} which uses {@link DataTree} and other * classes such as {@link SnapshotBackedWriteTransaction}. - * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask} + * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask} * to implement {@link DOMStore} contract. * */ -public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher { +public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); - private static final ListenableFuture SUCCESSFUL_FUTURE = Futures.immediateFuture(null); - private static final ListenableFuture CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE); private static final Invoker, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER = new Invoker, DOMImmutableDataChangeEvent>() { @@ -120,17 +113,17 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot()); + return SnapshotBackedTransactions.newReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this); + return SnapshotBackedTransactions.newReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this); + return SnapshotBackedTransactions.newWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this); } @Override @@ -214,99 +207,31 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D } @Override - protected void transactionAborted(final SnapshotBackedWriteTransaction tx) { + protected void transactionAborted(final SnapshotBackedWriteTransaction tx) { LOG.debug("Tx: {} is closed.", tx.getIdentifier()); } @Override - protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) { - LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree); - return new ThreePhaseCommitImpl(tx, tree); + protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification modification) { + LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification); + return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification); } - Object nextIdentifier() { + String nextIdentifier() { return name + "-" + txCounter.getAndIncrement(); } - private static void warnDebugContext(final AbstractDOMStoreTransaction transaction) { - final Throwable ctx = transaction.getDebugContext(); - if (ctx != null) { - LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx); - } + void validate(final DataTreeModification modification) throws DataValidationFailedException { + dataTree.validate(modification); } - private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort { - private final SnapshotBackedWriteTransaction transaction; - private final DataTreeModification modification; - - private ResolveDataChangeEventsTask listenerResolver; - private DataTreeCandidate candidate; - - public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) { - this.transaction = writeTransaction; - this.modification = modification; - } - - @Override - public ListenableFuture canCommit() { - try { - dataTree.validate(modification); - LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier()); - return CAN_COMMIT_FUTURE; - } catch (ConflictingModificationAppliedException e) { - LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(), - e.getPath()); - warnDebugContext(transaction); - return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e)); - } catch (DataValidationFailedException e) { - LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(), - e.getPath(), e); - warnDebugContext(transaction); - - // For debugging purposes, allow dumping of the modification. Coupled with the above - // precondition log, it should allow us to understand what went on. - LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree); - - return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e)); - } catch (Exception e) { - LOG.warn("Unexpected failure in validation phase", e); - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.prepare(modification); - listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree); - return SUCCESSFUL_FUTURE; - } catch (Exception e) { - LOG.warn("Unexpected failure in pre-commit phase", e); - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture abort() { - candidate = null; - return SUCCESSFUL_FUTURE; - } - - @Override - public ListenableFuture commit() { - checkState(candidate != null, "Proposed subtree must be computed"); - - /* - * The commit has to occur atomically with regard to listener - * registrations. - */ - synchronized (InMemoryDOMDataStore.this) { - dataTree.commit(candidate); - changePublisher.publishChange(candidate); - listenerResolver.resolve(dataChangeListenerNotificationManager); - } + DataTreeCandidate prepare(final DataTreeModification modification) { + return dataTree.prepare(modification); + } - return SUCCESSFUL_FUTURE; - } + synchronized void commit(final DataTreeCandidate candidate) { + dataTree.commit(candidate); + changePublisher.publishChange(candidate); + ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java new file mode 100644 index 0000000000..dba71bff4c --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java @@ -0,0 +1,100 @@ +package org.opendaylight.controller.md.sal.dom.store.impl; + +import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class); + private static final ListenableFuture SUCCESSFUL_FUTURE = Futures.immediateFuture(null); + private static final ListenableFuture CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE); + private final SnapshotBackedWriteTransaction transaction; + private final DataTreeModification modification; + private final InMemoryDOMDataStore store; + private DataTreeCandidate candidate; + + public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) { + this.transaction = Preconditions.checkNotNull(writeTransaction); + this.modification = Preconditions.checkNotNull(modification); + this.store = Preconditions.checkNotNull(store); + } + + private static void warnDebugContext(final AbstractDOMStoreTransaction transaction) { + final Throwable ctx = transaction.getDebugContext(); + if (ctx != null) { + LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx); + } + } + + @Override + public final ListenableFuture canCommit() { + try { + store.validate(modification); + LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier()); + return CAN_COMMIT_FUTURE; + } catch (ConflictingModificationAppliedException e) { + LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(), + e.getPath()); + warnDebugContext(getTransaction()); + return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e)); + } catch (DataValidationFailedException e) { + LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(), + e.getPath(), e); + warnDebugContext(getTransaction()); + + // For debugging purposes, allow dumping of the modification. Coupled with the above + // precondition log, it should allow us to understand what went on. + LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store); + + return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e)); + } catch (Exception e) { + LOG.warn("Unexpected failure in validation phase", e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public final ListenableFuture preCommit() { + try { + candidate = store.prepare(modification); + return SUCCESSFUL_FUTURE; + } catch (Exception e) { + LOG.warn("Unexpected failure in pre-commit phase", e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public final ListenableFuture abort() { + candidate = null; + return SUCCESSFUL_FUTURE; + } + + protected final SnapshotBackedWriteTransaction getTransaction() { + return transaction; + } + + @Override + public ListenableFuture commit() { + checkState(candidate != null, "Proposed subtree must be computed"); + + /* + * The commit has to occur atomically with regard to listener + * registrations. + */ + store.commit(candidate); + return SUCCESSFUL_FUTURE; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java index 9de4892d91..568f88376c 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java @@ -21,12 +21,13 @@ import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -37,7 +38,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; - public class InMemoryDataStoreTest { private SchemaContext schemaContext; @@ -268,7 +268,7 @@ public class InMemoryDataStoreTest { Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot ) .readNode( Mockito.any( YangInstanceIdentifier.class ) ); - DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot); + DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadTransaction("1", true, mockSnapshot); doReadAndThrowEx( readTx ); } @@ -292,14 +292,14 @@ public class InMemoryDataStoreTest { Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification ) .readNode( Mockito.any( YangInstanceIdentifier.class ) ); Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification(); - TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class ); - DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady); + @SuppressWarnings("unchecked") + TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class ); + DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadWriteTransaction("1", false, mockSnapshot, mockReady); doReadAndThrowEx( readTx ); } - private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable { - + private static void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable { try { readTx.read(TestModel.TEST_PATH).get(); } catch( ExecutionException e ) { diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java index e11cac2eb3..10399ffeff 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java @@ -27,6 +27,9 @@ import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext; import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException; import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag; import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType; +import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode; +import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; @@ -84,9 +87,18 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr final JsonReader reader = new JsonReader(new InputStreamReader(entityStream)); jsonParser.parse(reader); - final NormalizedNode partialResult = resultHolder.getResult(); + NormalizedNode partialResult = resultHolder.getResult(); final NormalizedNode result; - if(partialResult instanceof MapNode) { + + // unwrap result from augmentation and choice nodes on PUT + if (!isPost()) { + while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) { + final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next(); + partialResult = (NormalizedNode) childNode; + } + } + + if (partialResult instanceof MapNode) { result = Iterables.getOnlyElement(((MapNode) partialResult).getValue()); } else { result = partialResult; diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java index 4257e172b4..74a9bd2d31 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java @@ -34,6 +34,8 @@ import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils; import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory; +import org.opendaylight.yangtools.yang.model.api.AugmentationSchema; +import org.opendaylight.yangtools.yang.model.api.AugmentationTarget; import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode; import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode; import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; @@ -126,18 +128,25 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro final String docRootElm = doc.getDocumentElement().getLocalName(); final String schemaNodeName = pathContext.getSchemaNode().getQName().getLocalName(); + // FIXME the factory instance should be cached if the schema context is the same + final DomToNormalizedNodeParserFactory parserFactory = + DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext()); + if (!schemaNodeName.equalsIgnoreCase(docRootElm)) { final DataSchemaNode foundSchemaNode = findSchemaNodeOrParentChoiceByName(schemaNode, docRootElm); if (foundSchemaNode != null) { + if (schemaNode instanceof AugmentationTarget) { + final AugmentationSchema augmentSchemaNode = findCorrespondingAugment(schemaNode, foundSchemaNode); + if (augmentSchemaNode != null) { + return parserFactory.getAugmentationNodeParser().parse(elements, augmentSchemaNode); + } + } schemaNode = foundSchemaNode; } } - // FIXME the factory instance should be cached if the schema context is the same - final DomToNormalizedNodeParserFactory parserFactory = - DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext()); - NormalizedNode parsed = null; + if(schemaNode instanceof ContainerSchemaNode) { return parserFactory.getContainerNodeParser().parse(Collections.singletonList(doc.getDocumentElement()), (ContainerSchemaNode) schemaNode); } else if(schemaNode instanceof ListSchemaNode) { @@ -147,7 +156,6 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro final ChoiceSchemaNode casted = (ChoiceSchemaNode) schemaNode; return parserFactory.getChoiceNodeParser().parse(elements, casted); } - // FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode return parsed; @@ -175,5 +183,17 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro } return null; } + + private static AugmentationSchema findCorrespondingAugment(final DataSchemaNode parent, final DataSchemaNode child) { + if (parent instanceof AugmentationTarget && !((parent instanceof ChoiceCaseNode) || (parent instanceof ChoiceSchemaNode))) { + for (AugmentationSchema augmentation : ((AugmentationTarget) parent).getAvailableAugmentations()) { + DataSchemaNode childInAugmentation = augmentation.getDataChildByName(child.getQName()); + if (childInAugmentation != null) { + return augmentation; + } + } + } + return null; + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java index 33795889a1..8e88be6f50 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java @@ -828,12 +828,13 @@ public class RestconfImpl implements RestconfService { throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE); } - final URI payloadNS = payload.getData().getNodeType().getNamespace(); - if (payloadNS == null) { - throw new RestconfDocumentedException( - "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)", - ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE); - } + // FIXME: move this to parsing stage (we can have augmentation nodes here which do not have namespace) +// final URI payloadNS = payload.getData().getNodeType().getNamespace(); +// if (payloadNS == null) { +// throw new RestconfDocumentedException( +// "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)", +// ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE); +// } final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint(); final InstanceIdentifierContext iiWithData = payload.getInstanceIdentifierContext();