Merge "Bug 2260: Reduce overhead of unused TransactionProxy instances"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 9 Mar 2015 23:57:47 +0000 (23:57 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 9 Mar 2015 23:57:48 +0000 (23:57 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/AbstractIdentifierAwareJaxRsProvider.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java

index a5abd2fc69059f4af377ab85ac161372de15cbed..52b4652de6e7d464d9d54c67ebea37ca233de862 100644 (file)
@@ -132,6 +132,10 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
+    private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+            Serialization.serializedActorPath(getSelf()));
+
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -265,17 +269,17 @@ public class Shard extends RaftActor {
         }
 
         try {
-            if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+            if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCreateTransaction(message);
             } else if (message instanceof ForwardedReadyTransaction) {
                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
-            } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCommitTransaction(CommitTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
-            } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+            } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
@@ -457,17 +461,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        ActorRef replyActorPath = self();
         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
-        }
 
-        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
-                Serialization.serializedActorPath(replyActorPath));
-        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, getSelf());
+            ReadyTransactionReply readyTransactionReply =
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+            getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                    readyTransactionReply, getSelf());
+
+        } else {
+
+            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+                    READY_TRANSACTION_REPLY, getSelf());
+        }
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
@@ -550,11 +558,11 @@ public class Shard extends RaftActor {
             throw new IllegalStateException("SchemaContext is not set");
         }
 
-        if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+        if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
-            shardMBean.incrementReadOnlyTransactionCount();
+            shardMBean.incrementWriteOnlyTransactionCount();
 
-            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -562,11 +570,12 @@ public class Shard extends RaftActor {
 
             return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
 
-        } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+        } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
-            shardMBean.incrementWriteOnlyTransactionCount();
+            shardMBean.incrementReadOnlyTransactionCount();
+
+            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
 
-            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -601,10 +610,8 @@ public class Shard extends RaftActor {
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
             String transactionChainId, short clientVersion) {
 
-        ShardTransactionIdentifier transactionId =
-            ShardTransactionIdentifier.builder()
-                .remoteTransactionId(remoteTransactionId)
-                .build();
+
+        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
index d836a347c514b434db26a32cbb172d1671e99253..c441afb49787fb7d5ae946c7fc0e0e91ec7137ad 100644 (file)
@@ -150,7 +150,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
             findPrimary(FindPrimary.fromSerializable(message));
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
index dd04afcb0b9f4091e73908ba8ef63d262f609353..d1f9495d862770aec58b90ad43e91f5ce1cf2a6f 100644 (file)
@@ -13,15 +13,11 @@ import com.google.common.base.Preconditions;
 public class ShardTransactionIdentifier {
     private final String remoteTransactionId;
 
-    private ShardTransactionIdentifier(String remoteTransactionId) {
+    public ShardTransactionIdentifier(String remoteTransactionId) {
         this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
                 "remoteTransactionId should not be null");
     }
 
-    public static Builder builder(){
-        return new Builder();
-    }
-
     public String getRemoteTransactionId() {
         return remoteTransactionId;
     }
@@ -55,17 +51,4 @@ public class ShardTransactionIdentifier {
         return sb.toString();
     }
 
-    public static class Builder {
-        private String remoteTransactionId;
-
-        public Builder remoteTransactionId(String remoteTransactionId){
-            this.remoteTransactionId = remoteTransactionId;
-            return this;
-        }
-
-        public ShardTransactionIdentifier build(){
-            return new ShardTransactionIdentifier(remoteTransactionId);
-        }
-
-    }
 }
index 7eede29b65690db530fd4b9cfb9acb130365fcb6..0fb09d8231903bbc9b530f488039adc6b8672b90 100644 (file)
@@ -15,15 +15,19 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -95,6 +99,7 @@ public class ActorContext {
     private final int transactionOutstandingOperationLimit;
     private Timeout transactionCommitOperationTimeout;
     private final Dispatchers dispatchers;
+    private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
@@ -116,6 +121,14 @@ public class ActorContext {
         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
 
         setCachedProperties();
+        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .build();
+
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -204,6 +217,10 @@ public class ActorContext {
     }
 
     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
+        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+        if(ret != null){
+            return ret;
+        }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true).toSerializable(),
                 datastoreContext.getShardInitializationTimeout());
@@ -211,11 +228,13 @@ public class ActorContext {
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
                     PrimaryFound found = PrimaryFound.fromSerializable(response);
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
-                    return actorSystem.actorSelection(found.getPrimaryPath());
+                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
+                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
+                    return actorSelection;
                 } else if(response instanceof ActorNotInitialized) {
                     throw new NotInitializedException(
                             String.format("Found primary shard %s but it's not initialized yet. " +
@@ -325,7 +344,7 @@ public class ActorContext {
         Preconditions.checkArgument(message != null, "message must not be null");
 
         LOG.debug("Sending message {} to {}", message.getClass(), actor);
-        return ask(actor, message, timeout);
+        return doAsk(actor, message, timeout);
     }
 
     /**
@@ -361,7 +380,7 @@ public class ActorContext {
 
         LOG.debug("Sending message {} to {}", message.getClass(), actor);
 
-        return ask(actor, message, timeout);
+        return doAsk(actor, message, timeout);
     }
 
     /**
@@ -555,4 +574,16 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+        return ask(actorRef, message, timeout);
+    }
+
+    protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+        return ask(actorRef, message, timeout);
+    }
+
+    @VisibleForTesting
+    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
+        return primaryShardActorSelectionCache;
+    }
 }
index fd41c49390b484fd0d4343befa2f920d542e2f74..6bd732e038a00055bb2407ccc416c7f192059405 100644 (file)
@@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
@@ -11,9 +14,13 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.dispatch.Futures;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
@@ -23,12 +30,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest{
 
@@ -278,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -311,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -327,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
@@ -365,4 +381,122 @@ public class ActorContextTest extends AbstractActorTest{
                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
         }};
     }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedSelection, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected PrimaryNotFoundException");
+            } catch(PrimaryNotFoundException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new ActorNotInitialized());
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected NotInitializedException");
+            } catch(NotInitializedException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
 }
index 978ae0d9c5205e528a0bee6de31702251fd180f4..b158cbc4ced612ef2833b9bd55baeb45dfa21234 100644 (file)
@@ -13,7 +13,7 @@ public class AbstractIdentifierAwareJaxRsProvider {
     private UriInfo uriInfo;
 
     protected final String getIdentifier() {
-        return uriInfo.getPathParameters().getFirst(RestconfConstants.IDENTIFIER);
+        return uriInfo.getPathParameters(false).getFirst(RestconfConstants.IDENTIFIER);
     }
 
     protected final Optional<InstanceIdentifierContext> getIdentifierWithSchema() {
index dc989d2786baf14d33d762e5b40a2ff6ed283713..8d90a60e3e3d89bcc7d8d73bb7c86a3bf14ec5d6 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.sal.rest.impl;
 
-import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
 import com.google.gson.stream.JsonReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,10 +27,14 @@ import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.codec.gson.JsonParserStream;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
+import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,14 +57,25 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
             final MultivaluedMap<String, String> httpHeaders, final InputStream entityStream) throws IOException,
             WebApplicationException {
         try {
-            Optional<InstanceIdentifierContext> path = getIdentifierWithSchema();
-            NormalizedNodeResult resultHolder = new NormalizedNodeResult();
-            NormalizedNodeStreamWriter writer = ImmutableNormalizedNodeStreamWriter.from(resultHolder);
-            JsonParserStream jsonParser = JsonParserStream.create(writer, path.get().getSchemaContext());
-            JsonReader reader = new JsonReader(new InputStreamReader(entityStream));
+            final InstanceIdentifierContext<?> path = getIdentifierWithSchema().get();
+            final NormalizedNodeResult resultHolder = new NormalizedNodeResult();
+            final NormalizedNodeStreamWriter writer = ImmutableNormalizedNodeStreamWriter.from(resultHolder);
+
+            final SchemaNode parentSchema = SchemaContextUtil.findDataSchemaNode(path.getSchemaContext(), path.getSchemaNode().getPath().getParent());
+            final JsonParserStream jsonParser = JsonParserStream.create(writer, path.getSchemaContext(), parentSchema);
+            final JsonReader reader = new JsonReader(new InputStreamReader(entityStream));
             jsonParser.parse(reader);
-            return new NormalizedNodeContext(path.get(),resultHolder.getResult());
-        } catch (Exception e) {
+
+            final NormalizedNode<?, ?> partialResult = resultHolder.getResult();
+            final NormalizedNode<?, ?> result;
+            if(partialResult instanceof MapNode) {
+
+                result = Iterables.getOnlyElement(((MapNode) partialResult).getValue());
+            } else {
+                result = partialResult;
+            }
+            return new NormalizedNodeContext(path,result);
+        } catch (final Exception e) {
             LOG.debug("Error parsing json input", e);
 
             throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,