Merge "BUG 2667 : Handle null value type"
authorTom Pantelis <tpanteli@brocade.com>
Wed, 4 Feb 2015 16:11:01 +0000 (16:11 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 4 Feb 2015 16:11:01 +0000 (16:11 +0000)
37 files changed:
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MergeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/WriteModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index 01a55e883eaf88622f65d6fbda74b7c89d020e78..f88e09cecba5b456e73b9ce25e6fc09efa74bc7c 100644 (file)
                 <type>xml</type>
                 <scope>runtime</scope>
             </dependency>
+            <dependency>
+              <groupId>${project.groupId}</groupId>
+              <artifactId>features-restconf</artifactId>
+              <version>${project.version}</version>
+              <classifier>features</classifier>
+              <type>xml</type>
+              <scope>runtime</scope>
+            </dependency>
 
             <!-- FIXME: move this into netconf-artifacts -->
             <dependency>
         </dependencies>
     </dependencyManagement>
 </project>
-
index 8f416b3abc45145e2f95307332052b66cdb4b5a1..9aff86ba2b09572a44200ac2095331d03c687a66 100644 (file)
@@ -41,7 +41,7 @@ public class ExampleActor extends RaftActor {
     private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
-    private Optional<ActorRef> roleChangeNotifier;
+    private final Optional<ActorRef> roleChangeNotifier;
 
 
     public ExampleActor(String id, Map<String, String> peerAddresses,
@@ -127,10 +127,10 @@ public class ExampleActor extends RaftActor {
         } catch (Exception e) {
             LOG.error(e, "Exception in creating snapshot");
         }
-        getSelf().tell(new CaptureSnapshotReply(bs), null);
+        getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(ByteString snapshot) {
+    @Override protected void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
             state.putAll((HashMap) toObject(snapshot));
@@ -162,12 +162,12 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+    private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
         try {
-            bis = new ByteArrayInputStream(bs.toByteArray());
+            bis = new ByteArrayInputStream(bs);
             ois = new ObjectInputStream(bis);
             obj = ois.readObject();
         } finally {
@@ -215,6 +215,6 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(ByteString snapshot) {
+    protected void applyRecoverySnapshot(byte[] snapshot) {
     }
 }
index 164c2cea561349cf178d63965eccd0a313e29b4e..c256c822a420e3a22b5a351778d58a88e73a9e8d 100644 (file)
@@ -199,7 +199,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.start();
 
         // Apply the snapshot to the actors state
-        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+        applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
@@ -317,7 +317,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     snapshot.getLastAppliedTerm()
                 );
             }
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
             replicatedLog = new ReplicatedLogImpl(snapshot);
@@ -354,17 +355,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.info("CaptureSnapshot received by actor");
-            CaptureSnapshot cs = (CaptureSnapshot)message;
-            captureSnapshot = cs;
-            createSnapshot();
 
-        } else if (message instanceof CaptureSnapshotReply){
-            LOG.info("CaptureSnapshotReply received by actor");
-            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+            if(captureSnapshot == null) {
+                captureSnapshot = (CaptureSnapshot)message;
+                createSnapshot();
+            }
 
-            ByteString stateInBytes = csr.getSnapshot();
-            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
-            handleCaptureSnapshotReply(stateInBytes);
+        } else if (message instanceof CaptureSnapshotReply){
+            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
@@ -583,7 +581,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -612,9 +610,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(ByteString snapshot);
+    protected abstract void applySnapshot(byte[] snapshotBytes);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -661,11 +659,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+        LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+        Snapshot sn = Snapshot.create(snapshotBytes,
             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
@@ -687,7 +687,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
             // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+                    ByteString.copyFrom(snapshotBytes)));
         }
 
         captureSnapshot = null;
index 96150db68982134881599f5114b6aa359ba82081..82f3e0dce02102034c54d127cdf83374313c61bc 100644 (file)
@@ -7,20 +7,15 @@
  */
 package org.opendaylight.controller.cluster.raft.base.messages;
 
-import com.google.protobuf.ByteString;
 
 public class CaptureSnapshotReply {
-    private ByteString snapshot;
+    private final byte [] snapshot;
 
-    public CaptureSnapshotReply(ByteString snapshot) {
+    public CaptureSnapshotReply(byte [] snapshot) {
         this.snapshot = snapshot;
     }
 
-    public ByteString getSnapshot() {
+    public byte [] getSnapshot() {
         return snapshot;
     }
-
-    public void setSnapshot(ByteString snapshot) {
-        this.snapshot = snapshot;
-    }
 }
index d999bb2ba1de79e59d65b5cd78614bc8b3fc2c7c..6b266d710e4aa44f793c4ed2bc809347944f1c15 100644 (file)
@@ -1,5 +1,17 @@
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
@@ -57,18 +69,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -177,11 +177,10 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override
-        protected void applyRecoverySnapshot(ByteString snapshot) {
-            delegate.applyRecoverySnapshot(snapshot);
+        protected void applyRecoverySnapshot(byte[] bytes) {
+            delegate.applyRecoverySnapshot(bytes);
             try {
-                Object data = toObject(snapshot);
-                System.out.println("!!!!!applyRecoverySnapshot: "+data);
+                Object data = toObject(bytes);
                 if (data instanceof List) {
                     state.addAll((List<?>) data);
                 }
@@ -194,7 +193,7 @@ public class RaftActorTest extends AbstractActorTest {
             delegate.createSnapshot();
         }
 
-        @Override protected void applySnapshot(ByteString snapshot) {
+        @Override protected void applySnapshot(byte [] snapshot) {
             delegate.applySnapshot(snapshot);
         }
 
@@ -216,12 +215,12 @@ public class RaftActorTest extends AbstractActorTest {
             return this.getId();
         }
 
-        private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+        private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
             Object obj = null;
             ByteArrayInputStream bis = null;
             ObjectInputStream ois = null;
             try {
-                bis = new ByteArrayInputStream(bs.toByteArray());
+                bis = new ByteArrayInputStream(bs);
                 ois = new ObjectInputStream(bis);
                 obj = ois.readObject();
             } finally {
@@ -431,7 +430,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
+                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -500,7 +499,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
+                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -676,7 +675,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
 
@@ -722,7 +721,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
 
@@ -814,7 +813,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
+                verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                     oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -859,7 +858,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
                         new Exception()));
index fc381c19a98053161fbc93c3525ca8219d006720..088f4dfbe98a1358a980a84e8ad3adaa99736b57 100644 (file)
@@ -21,7 +21,9 @@ import java.util.Map;
 import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private final Map<String, Integer> stringCodeMap = new HashMap<>();
 
+    private NormalizedNodeWriter normalizedNodeWriter;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -61,6 +65,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         this.output = Preconditions.checkNotNull(output);
     }
 
+    private NormalizedNodeWriter normalizedNodeWriter() {
+        if(normalizedNodeWriter == null) {
+            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(this);
+        }
+
+        return normalizedNodeWriter;
+    }
+
+    public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        normalizedNodeWriter().write(node);
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
index 99de5dde358ecb17ada94f8761f050c3c11ce6c7..83e10cf6afdf882a9df33e8de10f503536f0bba1 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Deprecated
 public class CompositeModificationByteStringPayload extends Payload implements
         Serializable {
     private static final long serialVersionUID = 1L;
@@ -96,6 +97,7 @@ public class CompositeModificationByteStringPayload extends Payload implements
         return null;
     }
 
+    @Override
     public int size(){
         return byteString.size();
     }
index 075c6075a86b327775d23f4c217794700b69611f..fe5043e73d3593ac4b2966c8cfac6148d909d0d0 100644 (file)
@@ -18,9 +18,9 @@ import java.util.Map;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 
+@Deprecated
 public class CompositeModificationPayload extends Payload implements
     Serializable {
-    private static final long serialVersionUID = 1L;
 
     private final PersistentMessages.CompositeModification modification;
 
@@ -73,6 +73,7 @@ public class CompositeModificationPayload extends Payload implements
         return this.modification;
     }
 
+    @Override
     public int size(){
         return this.modification.getSerializedSize();
     }
index 7df53082948d0c69f4ea2c1223025821bd5946bd..d721494e15fcde76eb9b4f4e4f42a8e3e41bdd87 100644 (file)
@@ -51,16 +51,11 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries
  *
  */
 public abstract class Payload {
-    private String clientPayloadClassName;
 
     public String getClientPayloadClassName() {
         return this.getClass().getName();
     }
 
-    public void setClientPayloadClassName(String clientPayloadClassName) {
-        this.clientPayloadClassName = clientPayloadClassName;
-    }
-
     /**
      * Encode the payload data as a protocol buffer extension.
      * <p>
@@ -68,6 +63,7 @@ public abstract class Payload {
      * @param <T>
      * @return Map of <GeneratedMessage.GeneratedExtension, T>
      */
+    @Deprecated
     public abstract <T extends Object> Map<GeneratedMessage.GeneratedExtension, T> encode();
 
     /**
@@ -77,11 +73,9 @@ public abstract class Payload {
      * @param payload The payload in protocol buffer format
      * @return
      */
+    @Deprecated
     public abstract Payload decode(
         AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
 
     public abstract int size();
-
-
-
 }
index fc10b9c23b215fd37012e75549a7b997bf41434f..84f07760f53f4a50b7fd3da61b2b3ddeb7f3fe31 100644 (file)
@@ -9,8 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -63,20 +62,16 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            YangInstanceIdentifier path) {
+    public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", identifier, path);
         operationLimiter.release();
-        return Futures.immediateFailedCheckedFuture(new ReadFailedException(
-                "Error reading data for path " + path, failure));
+        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            YangInstanceIdentifier path) {
+    public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
         operationLimiter.release();
-        return Futures.immediateFailedCheckedFuture(new ReadFailedException(
-                "Error checking exists for path " + path, failure));
+        proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
-}
\ No newline at end of file
+}
index a3ef0339b7571172dfb0d5b2338f52911a86cf9c..9cd758ba30fdb94e85cd1703d99a8e0c55a50a17 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -25,8 +24,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -51,31 +49,29 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
@@ -95,6 +91,8 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
@@ -122,8 +120,6 @@ public class Shard extends RaftActor {
 
     private SchemaContext schemaContext;
 
-    private ActorRef createSnapshotTransaction;
-
     private int createSnapshotTransactionCounter;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -242,9 +238,7 @@ public class Shard extends RaftActor {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
 
-        if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-            handleReadDataReply(message);
-        } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
             handleCreateTransaction(message);
         } else if(message instanceof ForwardedReadyTransaction) {
             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
@@ -326,9 +320,9 @@ public class Shard extends RaftActor {
                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
             } else {
                 Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+                        new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -475,20 +469,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleReadDataReply(final Object message) {
-        // This must be for install snapshot. Don't want to open this up and trigger
-        // deSerialization
-
-        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
-                self());
-
-        createSnapshotTransaction = null;
-
-        // Send a PoisonPill instead of sending close transaction because we do not really need
-        // a response
-        getSender().tell(PoisonPill.getInstance(), self());
-    }
-
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
@@ -684,7 +664,13 @@ public class Shard extends RaftActor {
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@@ -694,12 +680,12 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
 
-        recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
+        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
@@ -761,7 +747,14 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        }
+        else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
@@ -769,7 +762,6 @@ public class Shard extends RaftActor {
             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
-
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -788,8 +780,7 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                    modification, schemaContext));
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);
@@ -811,24 +802,21 @@ public class Shard extends RaftActor {
 
     @Override
     protected void createSnapshot() {
-        if (createSnapshotTransaction == null) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
 
-            // Create a transaction. We are really going to treat the transaction as a worker
-            // so that this actor does not get block building the snapshot
-            createSnapshotTransaction = createTransaction(
+        ActorRef createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
                 DataStoreVersions.CURRENT_VERSION);
 
-            createSnapshotTransaction.tell(
-                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
-
-        }
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
     }
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(final ByteString snapshot) {
+    protected void applySnapshot(final byte[] snapshotBytes) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -836,17 +824,16 @@ public class Shard extends RaftActor {
         LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
-                .decode(serializedNode);
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(YangInstanceIdentifier.builder().build());
+            transaction.delete(DATASTORE_ROOT);
 
             // Add everything from the remote node back
-            transaction.write(YangInstanceIdentifier.builder().build(), node);
+            transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
-        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
         } finally {
             LOG.info("Done applying snapshot");
index be9c4d80e311484d0e6edea2050d85a0ebf28d1f..6f8d0567d9aa162d4080361162fe709decd544ce 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -23,6 +33,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardReadTransaction extends ShardTransaction {
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
@@ -39,7 +51,8 @@ public class ShardReadTransaction extends ShardTransaction {
 
         } else if (message instanceof DataExists) {
             dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
-
+        } else if (message instanceof CreateSnapshot) {
+            createSnapshot();
         } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
 
@@ -51,6 +64,34 @@ public class ShardReadTransaction extends ShardTransaction {
         }
     }
 
+    private void createSnapshot() {
+
+        // This is a special message sent by the shard to send back a serialized snapshot of the whole
+        // data store tree. This transaction was created for that purpose only so we can
+        // self-destruct after sending the reply.
+
+        final ActorRef sender = getSender();
+        final ActorRef self = getSelf();
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(DATASTORE_ROOT);
+
+        Futures.addCallback(future, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
+                sender.tell(new CaptureSnapshotReply(serialized), self);
+
+                self.tell(PoisonPill.getInstance(), self);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                sender.tell(new akka.actor.Status.Failure(t), self);
+
+                self.tell(PoisonPill.getInstance(), self);
+            }
+        });
+    }
+
     @Override
     protected DOMStoreTransaction getDOMStoreTransaction() {
         return transaction;
index 94fb5841021ea994e019d946e3e106f7d95256c3..238b4e46dce041add47117503fcb68feb54e8e27 100644 (file)
@@ -7,16 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -24,11 +24,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
  * and journal log entry batch are de-serialized and applied to their own write transaction
@@ -73,11 +68,11 @@ class ShardRecoveryCoordinator {
     /**
      * Submits a snapshot.
      *
-     * @param snapshot the serialized snapshot
+     * @param snapshotBytes the serialized snapshot
      * @param resultingTx the write Tx to which to apply the entries
      */
-    void submit(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshot, resultingTx);
+    void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
         resultingTxList.add(resultingTx);
         executor.execute(task);
     }
@@ -121,7 +116,7 @@ class ShardRecoveryCoordinator {
         public void run() {
             for(int i = 0; i < logEntries.size(); i++) {
                 MutableCompositeModification.fromSerializable(
-                        logEntries.get(i), schemaContext).apply(resultingTx);
+                        logEntries.get(i)).apply(resultingTx);
                 // Null out to GC quicker.
                 logEntries.set(i, null);
             }
@@ -130,28 +125,22 @@ class ShardRecoveryCoordinator {
 
     private class SnapshotRecoveryTask extends ShardRecoveryTask {
 
-        private final ByteString snapshot;
+        private final byte[] snapshotBytes;
 
-        SnapshotRecoveryTask(ByteString snapshot, DOMStoreWriteTransaction resultingTx) {
+        SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
             super(resultingTx);
-            this.snapshot = snapshot;
+            this.snapshotBytes = snapshotBytes;
         }
 
         @Override
         public void run() {
-            try {
-                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
-                NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                        serializedNode);
-
-                // delete everything first
-                resultingTx.delete(YangInstanceIdentifier.builder().build());
-
-                // Add everything from the remote node back
-                resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-            } catch (InvalidProtocolBufferException e) {
-                LOG.error("Error deserializing snapshot", e);
-            }
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+            // delete everything first
+            resultingTx.delete(YangInstanceIdentifier.builder().build());
+
+            // Add everything from the remote node back
+            resultingTx.write(YangInstanceIdentifier.builder().build(), node);
         }
     }
 }
index 95c7ae10c0c4f394a219cd4ae2afec78c62ed1c7..a4a2f45fdbdda87cc1166aa0e169214eea0df313 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -82,8 +81,7 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(
-                    new ImmutableCompositeModification(modification)), getSelf());
+            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
         } else {
             super.handleReceive(message);
         }
@@ -94,7 +92,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("writeData at path : {}", message.getPath());
 
         modification.addModification(
-                new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
+                new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
@@ -110,7 +108,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("mergeData at path : {}", message.getPath());
 
         modification.addModification(
-                new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
+                new MergeModification(message.getPath(), message.getData()));
 
         try {
             transaction.merge(message.getPath(), message.getData());
index b6af31e641ce48e95903f935984181340a9eaf72..1b8e65e02d6d1bad037a02beaa77310088b6e67d 100644 (file)
@@ -9,9 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -31,10 +30,9 @@ interface TransactionContext {
 
     void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path);
+    void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
-    CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
+    void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
 
     List<Future<Object>> getRecordedOperationFutures();
-}
\ No newline at end of file
+}
index ce2c99ef52b2f6d76ee005accc60f4a9475e3253..530a36cff657304005ebd4b43a5bb1e46449951b 100644 (file)
@@ -12,7 +12,6 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
@@ -30,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializa
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -179,13 +177,11 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+    public void readData(
+            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
         LOG.debug("Tx {} readData called path = {}", identifier, path);
 
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail the read.
@@ -223,7 +219,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishReadData(final YangInstanceIdentifier path,
@@ -264,13 +259,10 @@ final class TransactionContextImpl extends AbstractTransactionContext {
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            final YangInstanceIdentifier path) {
+    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
 
         LOG.debug("Tx {} dataExists called path = {}", identifier, path);
 
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
         // If there were any previous recorded put/merge/delete operation reply Futures then we
         // must wait for them to successfully complete. This is necessary to honor the read
         // uncommitted semantics of the public API contract. If any one fails then fail this
@@ -307,8 +299,6 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
-
-        return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
     }
 
     private void finishDataExists(final YangInstanceIdentifier path,
@@ -344,4 +334,4 @@ final class TransactionContextImpl extends AbstractTransactionContext {
 
         future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
     }
-}
\ No newline at end of file
+}
index d79cd6f69f4b0e3e4f1171a332b45c36adbbd515..5bc53442aeff04aa43c299a49890b3ecfe70b974 100644 (file)
@@ -18,8 +18,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -257,14 +255,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> invoke(
-                    TransactionContext transactionContext) {
-                return transactionContext.readData(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.readData(path, proxyFuture);
             }
         });
+
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
     @Override
@@ -277,15 +278,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         throttleOperation();
 
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public CheckedFuture<Boolean, ReadFailedException> invoke(TransactionContext transactionContext) {
-                return transactionContext.dataExists(path);
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.dataExists(path, proxyFuture);
             }
         });
-    }
 
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+    }
 
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
@@ -323,7 +327,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.writeData(path, data);
@@ -341,7 +345,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -359,7 +363,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         throttleOperation();
 
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.deleteData(path);
@@ -386,12 +390,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
                         txFutureCallback.getShardName(), transactionChainId);
 
-            Future<ActorSelection> future = txFutureCallback.enqueueFutureOperation(new FutureOperation<ActorSelection>() {
-                @Override
-                public Future<ActorSelection> invoke(TransactionContext transactionContext) {
-                    return transactionContext.readyTransaction();
-                }
-            });
+            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            final Future<ActorSelection> future;
+            if (transactionContext != null) {
+                // avoid the creation of a promise and a TransactionOperation
+                future = transactionContext.readyTransaction();
+            } else {
+                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
+                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                    @Override
+                    public void invoke(TransactionContext transactionContext) {
+                        promise.completeWith(transactionContext.readyTransaction());
+                    }
+                });
+                future = promise.future();
+            }
 
             cohortFutures.add(future);
         }
@@ -430,7 +443,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void close() {
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     transactionContext.closeTransaction();
@@ -492,20 +505,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         void invoke(TransactionContext transactionContext);
     }
 
-    /**
-     * This interface returns a Guava Future
-     */
-    private static interface ReadOperation<T> {
-        CheckedFuture<T, ReadFailedException> invoke(TransactionContext transactionContext);
-    }
-
-    /**
-     * This interface returns a Scala Future
-     */
-    private static interface FutureOperation<T> {
-        Future<T> invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -582,64 +581,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-        <T> Future<T> enqueueFutureOperation(final FutureOperation<T> op) {
-
-            Future<T> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final Promise<T> promise = akka.dispatch.Futures.promise();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(op.invoke(transactionContext));
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            return future;
-        }
-
-        <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
-
-            CheckedFuture<T, ReadFailedException> future;
-
-            if (transactionContext != null) {
-                future = op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                final SettableFuture<T> proxyFuture = SettableFuture.create();
-                addTxOperationOnComplete(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
-                            @Override
-                            public void onSuccess(T data) {
-                                proxyFuture.set(data);
-                            }
-
-                            @Override
-                            public void onFailure(Throwable t) {
-                                proxyFuture.setException(t);
-                            }
-                        });
-                    }
-                });
-
-                future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
-            }
-
-            return future;
-        }
-
-        void enqueueModifyOperation(final TransactionOperation op) {
+        void enqueueTransactionOperation(final TransactionOperation op) {
 
             if (transactionContext != null) {
                 op.invoke(transactionContext);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java
new file mode 100644 (file)
index 0000000..c0d19af
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to a transaction actor to create a snapshot of the data store.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateSnapshot {
+    // Note: This class does not need to Serializable as it's only sent locally.
+
+    public static final CreateSnapshot INSTANCE = new CreateSnapshot();
+}
index 4f4f0fb8f17b3baf065c2340e97819ff97f2a43e..f04d00440405deab7e5a8be141fd624fd3c30f35 100644 (file)
@@ -11,22 +11,24 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.io.Serializable;
-
 /**
  * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
  */
-public abstract class AbstractModification implements Modification,
-    Serializable {
+public abstract class AbstractModification implements Modification {
 
-    private static final long serialVersionUID = 1638042650152084457L;
+    private YangInstanceIdentifier path;
 
-    protected final YangInstanceIdentifier path;
+    protected AbstractModification() {
+    }
 
     protected AbstractModification(YangInstanceIdentifier path) {
         this.path = path;
     }
 
+    protected void setPath(YangInstanceIdentifier path) {
+        this.path = path;
+    }
+
     public YangInstanceIdentifier getPath() {
         return path;
     }
index 056fe756371589c7d055d054b8884f50db5bd60f..833f86fb981f1179ce326c4d3703f17c3449aa73 100644 (file)
@@ -8,7 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -19,23 +24,51 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public class DeleteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
 
+    public DeleteModification() {
+    }
+
     public DeleteModification(YangInstanceIdentifier path) {
         super(path);
     }
 
     @Override
     public void apply(DOMStoreWriteTransaction transaction) {
-        transaction.delete(path);
+        transaction.delete(getPath());
     }
 
     @Override
+    public byte getType() {
+        return DELETE;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort();
+        setPath(SerializationUtils.deserializePath(in));
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        SerializationUtils.serializePath(getPath(), out);
+    }
+
+    @Override
+    @Deprecated
     public Object toSerializable() {
         return PersistentMessages.Modification.newBuilder().setType(this.getClass().toString())
-                .setPath(InstanceIdentifierUtils.toSerializable(this.path)).build();
+                .setPath(InstanceIdentifierUtils.toSerializable(getPath())).build();
     }
 
+    @Deprecated
     public static DeleteModification fromSerializable(Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
         return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
+
+    public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        DeleteModification mod = new DeleteModification();
+        mod.readExternal(in);
+        return mod;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java
deleted file mode 100644 (file)
index 2d11500..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.modification;
-
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-
-import java.util.List;
-
-public class ImmutableCompositeModification implements CompositeModification {
-
-    private final CompositeModification modification;
-
-    public ImmutableCompositeModification(CompositeModification modification) {
-        this.modification = modification;
-    }
-
-    @Override
-    public List<Modification> getModifications() {
-        return modification.getModifications();
-    }
-
-    @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
-        modification.apply(transaction);
-    }
-
-    @Override public Object toSerializable() {
-
-        PersistentMessages.CompositeModification.Builder builder =
-            PersistentMessages.CompositeModification.newBuilder();
-
-        for (Modification m : modification.getModifications()) {
-            builder.addModification(
-                (PersistentMessages.Modification) m.toSerializable());
-        }
-
-        return builder.build();
-    }
-}
index 2f9d77660aba4d76b671b91df000baae5c5780ba..571443eedd3a89f2ce9d474cbfbd883919a8d7ff 100644 (file)
@@ -8,13 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * MergeModification stores all the parameters required to merge data into the specified path
@@ -22,19 +23,33 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class MergeModification extends WriteModification {
     private static final long serialVersionUID = 1L;
 
-    public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-        final SchemaContext schemaContext) {
-        super(path, data, schemaContext);
+    public MergeModification() {
+    }
+
+    public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
-        transaction.merge(path, data);
+        transaction.merge(getPath(), getData());
     }
 
-    public static MergeModification fromSerializable(final Object serializable, final SchemaContext schemaContext) {
+    @Override
+    public byte getType() {
+        return MERGE;
+    }
+
+    @Deprecated
+    public static MergeModification fromSerializable(final Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(o.getPath(), o.getData());
-        return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+        Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(o.getPath(), o.getData());
+        return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
+    }
+
+    public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        MergeModification mod = new MergeModification();
+        mod.readExternal(in);
+        return mod;
     }
 }
index ed9b1fe3b9548f9b529b4e1e9ce6929e41fcda11..2dfcdf028785aeb5f35369b983b59e314469289f 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import java.io.Externalizable;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
@@ -25,10 +25,22 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  * which can then be applied to a write transaction
  * </p>
  */
-public interface Modification extends SerializableMessage {
-  /**
-   * Apply the modification to the specified transaction
-   * @param transaction
-   */
-  void apply(DOMStoreWriteTransaction transaction);
+public interface Modification extends Externalizable {
+
+    byte COMPOSITE = 1;
+    byte WRITE = 2;
+    byte MERGE = 3;
+    byte DELETE = 4;
+
+    /**
+     * Apply the modification to the specified transaction
+     *
+     * @param transaction
+     */
+    void apply(DOMStoreWriteTransaction transaction);
+
+    byte getType();
+
+    @Deprecated
+    Object toSerializable();
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java
new file mode 100644 (file)
index 0000000..2e39157
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.modification;
+
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry;
+
+/**
+ * Payload implementation for MutableCompositeModification used for persistence and replication.
+ *
+ * @author Thomas Pantelis
+ */
+public class ModificationPayload extends Payload implements Externalizable {
+    private static final long serialVersionUID = 1L;
+
+    private transient byte[] serializedPayload;
+
+    public ModificationPayload() {
+    }
+
+    public ModificationPayload(Modification from) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos);
+        out.writeObject(from);
+        out.close();
+        serializedPayload = bos.toByteArray();
+    }
+
+    public Modification getModification() throws IOException, ClassNotFoundException {
+        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedPayload));
+        Modification to = (Modification) in.readObject();
+        in.close();
+        return to;
+    }
+
+    @Override
+    public int size() {
+        return serializedPayload.length;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        int size = in.readInt();
+        serializedPayload = new byte[size];
+        in.readFully(serializedPayload);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(serializedPayload.length);
+        out.write(serializedPayload);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    @Deprecated
+    public <T> Map<GeneratedExtension, T> encode() {
+        return null;
+    }
+
+    @Override
+    @Deprecated
+    public Payload decode(ReplicatedLogEntry.Payload payload) {
+        return null;
+    }
+}
index 04854d26b2c060ae3b231481d1933a663e6d0b49..5d7947b19fc6ddaeafe133ec546dfc879dd07855 100644 (file)
@@ -8,24 +8,30 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * MutableCompositeModification is just a mutable version of a
  * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
  */
-public class MutableCompositeModification
-    implements CompositeModification {
+public class MutableCompositeModification implements CompositeModification {
+    private static final long serialVersionUID = 1L;
 
-    private static final long serialVersionUID = 1163377899140186790L;
+    private final List<Modification> modifications;
 
-    private final List<Modification> modifications = new ArrayList<>();
+    public MutableCompositeModification() {
+        modifications = new ArrayList<>();
+    }
 
     @Override
     public void apply(DOMStoreWriteTransaction transaction) {
@@ -34,6 +40,11 @@ public class MutableCompositeModification
         }
     }
 
+    @Override
+    public byte getType() {
+        return COMPOSITE;
+    }
+
     /**
      * Add a new Modification to the list of Modifications represented by this
      * composite
@@ -44,25 +55,88 @@ public class MutableCompositeModification
         modifications.add(modification);
     }
 
+    @Override
     public List<Modification> getModifications() {
-        return Collections.unmodifiableList(modifications);
+        return modifications;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort();
+
+        int size = in.readInt();
+
+        if(size > 1) {
+            SerializationUtils.REUSABLE_READER_TL.set(new NormalizedNodeInputStreamReader(in));
+        }
+
+        try {
+            for(int i = 0; i < size; i++) {
+                byte type = in.readByte();
+                switch(type) {
+                case Modification.WRITE:
+                    modifications.add(WriteModification.fromStream(in));
+                    break;
+
+                case Modification.MERGE:
+                    modifications.add(MergeModification.fromStream(in));
+                    break;
+
+                case Modification.DELETE:
+                    modifications.add(DeleteModification.fromStream(in));
+                    break;
+                }
+            }
+        } finally {
+            SerializationUtils.REUSABLE_READER_TL.remove();
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+
+        out.writeInt(modifications.size());
+
+        if(modifications.size() > 1) {
+            SerializationUtils.REUSABLE_WRITER_TL.set(new NormalizedNodeOutputStreamWriter(out));
+        }
+
+        try {
+            for(Modification mod: modifications) {
+                out.writeByte(mod.getType());
+                mod.writeExternal(out);
+            }
+        } finally {
+            SerializationUtils.REUSABLE_WRITER_TL.remove();
+        }
     }
 
-    @Override public Object toSerializable() {
+    @Override
+    @Deprecated
+    public Object toSerializable() {
         PersistentMessages.CompositeModification.Builder builder =
-            PersistentMessages.CompositeModification.newBuilder();
+                PersistentMessages.CompositeModification.newBuilder();
 
         builder.setTimeStamp(System.nanoTime());
 
         for (Modification m : modifications) {
             builder.addModification(
-                (PersistentMessages.Modification) m.toSerializable());
+                    (PersistentMessages.Modification) m.toSerializable());
         }
 
         return builder.build();
     }
 
-    public static MutableCompositeModification fromSerializable(Object serializable, SchemaContext schemaContext){
+    public static MutableCompositeModification fromSerializable(Object serializable) {
+        if(serializable instanceof MutableCompositeModification) {
+            return (MutableCompositeModification)serializable;
+        } else {
+            return fromLegacySerializable(serializable);
+        }
+    }
+
+    private static MutableCompositeModification fromLegacySerializable(Object serializable) {
         PersistentMessages.CompositeModification o = (PersistentMessages.CompositeModification) serializable;
         MutableCompositeModification compositeModification = new MutableCompositeModification();
 
@@ -70,9 +144,9 @@ public class MutableCompositeModification
             if(m.getType().equals(DeleteModification.class.toString())){
                 compositeModification.addModification(DeleteModification.fromSerializable(m));
             } else if(m.getType().equals(WriteModification.class.toString())){
-                compositeModification.addModification(WriteModification.fromSerializable(m, schemaContext));
+                compositeModification.addModification(WriteModification.fromSerializable(m));
             } else if(m.getType().equals(MergeModification.class.toString())){
-                compositeModification.addModification(MergeModification.fromSerializable(m, schemaContext));
+                compositeModification.addModification(MergeModification.fromSerializable(m));
             }
         }
 
index b2964086fd976b03516df0872d007f2f0a124af6..9c122c9adeef8a14cf05bfa877d38a5cbe310ae2 100644 (file)
@@ -8,32 +8,39 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * WriteModification stores all the parameters required to write data to the specified path
  */
 public class WriteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
-    protected final NormalizedNode<?, ?> data;
-    private final SchemaContext schemaContext;
 
-    public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
+    private NormalizedNode<?, ?> data;
+
+    public WriteModification() {
+    }
+
+    public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         super(path);
         this.data = data;
-        this.schemaContext = schemaContext;
     }
 
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
-        transaction.write(path, data);
+        transaction.write(getPath(), data);
     }
 
     public NormalizedNode<?, ?> getData() {
@@ -41,19 +48,51 @@ public class WriteModification extends AbstractModification {
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
+    public byte getType() {
+        return WRITE;
+    }
 
-        return PersistentMessages.Modification.newBuilder()
-                .setType(this.getClass().toString())
-                .setPath(encoded.getEncodedPath())
-                .setData(encoded.getEncodedNode().getNormalizedNode())
-                .build();
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // version
+
+        SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
-    public static WriteModification fromSerializable(final Object serializable, final SchemaContext schemaContext) {
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        SerializationUtils.serializePathAndNode(getPath(), data, out);
+    }
+
+    @Override
+    @Deprecated
+    public Object toSerializable() {
+        Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+        return PersistentMessages.Modification.newBuilder().setType(this.getClass().toString())
+                .setPath(encoded.getEncodedPath()).setData(encoded.getEncodedNode()
+                        .getNormalizedNode()).build();
+    }
+
+    @Deprecated
+    public static WriteModification fromSerializable(final Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(o.getPath(), o.getData());
-        return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+        Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(o.getPath(), o.getData());
+        return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
+    }
+
+    public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        WriteModification mod = new WriteModification();
+        mod.readExternal(in);
+        return mod;
     }
+
+    private static final Applier<WriteModification> APPLIER = new Applier<WriteModification>() {
+        @Override
+        public void apply(WriteModification instance, YangInstanceIdentifier path,
+                NormalizedNode<?, ?> node) {
+            instance.setPath(path);
+            instance.data = node;
+        }
+    };
 }
index 87c78bd27535cdbd08ed2e86c3af3b59abb7c8ca..5854932a6fa0d999fe368aa61bfdd252821739c6 100644 (file)
@@ -8,14 +8,20 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 
 /**
  * Provides various utility methods for serialization and de-serialization.
@@ -23,18 +29,38 @@ import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWrit
  * @author Thomas Pantelis
  */
 public final class SerializationUtils {
+    public static ThreadLocal<NormalizedNodeOutputStreamWriter> REUSABLE_WRITER_TL = new ThreadLocal<>();
+    public static ThreadLocal<NormalizedNodeInputStreamReader> REUSABLE_READER_TL = new ThreadLocal<>();
 
     public static interface Applier<T> {
         void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
     }
 
+    private static NormalizedNodeOutputStreamWriter streamWriter(DataOutput out) throws IOException {
+        NormalizedNodeOutputStreamWriter streamWriter = REUSABLE_WRITER_TL.get();
+        if(streamWriter == null) {
+            streamWriter = new NormalizedNodeOutputStreamWriter(out);
+        }
+
+        return streamWriter;
+    }
+
+    private static NormalizedNodeInputStreamReader streamReader(DataInput in) throws IOException {
+        NormalizedNodeInputStreamReader streamWriter = REUSABLE_READER_TL.get();
+        if(streamWriter == null) {
+            streamWriter = new NormalizedNodeInputStreamReader(in);
+        }
+
+        return streamWriter;
+    }
+
     public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
             DataOutput out) {
         Preconditions.checkNotNull(path);
         Preconditions.checkNotNull(node);
         try {
-            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
-            NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
+            streamWriter.writeNormalizedNode(node);
             streamWriter.writeYangInstanceIdentifier(path);
         } catch (IOException e) {
             throw new IllegalArgumentException(String.format("Error serializing path %s and Node %s",
@@ -44,7 +70,7 @@ public final class SerializationUtils {
 
     public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
         try {
-            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
             NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
             YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
             applier.apply(instance, path, node);
@@ -57,8 +83,8 @@ public final class SerializationUtils {
         try {
             out.writeBoolean(node != null);
             if(node != null) {
-                NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
-                NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+                NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
+                streamWriter.writeNormalizedNode(node);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(String.format("Error serializing NormalizedNode %s",
@@ -70,7 +96,7 @@ public final class SerializationUtils {
             try {
                 boolean present = in.readBoolean();
                 if(present) {
-                    NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
                     return streamReader.readNormalizedNode();
                 }
             } catch (IOException e) {
@@ -80,19 +106,45 @@ public final class SerializationUtils {
         return null;
     }
 
+    public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
+        NormalizedNode<?, ?> node = null;
+        try {
+            node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+        } catch(Exception e) {
+        }
+
+        if(node == null) {
+            // Must be from legacy protobuf serialization - try that.
+            try {
+                NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
+                node =  new NormalizedNodeToNodeCodec(null).decode(serializedNode);
+            } catch (InvalidProtocolBufferException e) {
+                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+            }
+        }
+
+        return node;
+    }
+
+    public static byte [] serializeNormalizedNode(NormalizedNode<?, ?> node) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        serializeNormalizedNode(node, new DataOutputStream(bos));
+        return bos.toByteArray();
+    }
+
     public static void serializePath(YangInstanceIdentifier path, DataOutput out) {
         Preconditions.checkNotNull(path);
         try {
-            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
             streamWriter.writeYangInstanceIdentifier(path);
         } catch (IOException e) {
-            throw new IllegalArgumentException(String.format("Error serializing path {}", path), e);
+            throw new IllegalArgumentException(String.format("Error serializing path %s", path), e);
         }
     }
 
     public static YangInstanceIdentifier deserializePath(DataInput in) {
         try {
-            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
             return streamReader.readYangInstanceIdentifier();
         } catch (IOException e) {
             throw new IllegalArgumentException("Error deserializing path", e);
index db9f3d1801c6698c863e3a5582059b493b0d05ee..5b7002eda2aafe923c0ad5d3b20addfd095b8a3e 100644 (file)
@@ -14,7 +14,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -25,6 +24,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+@Deprecated
 public class CompositeModificationByteStringPayloadTest {
 
     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
@@ -33,8 +33,7 @@ public class CompositeModificationByteStringPayloadTest {
     public void testSerialization(){
         WriteModification writeModification =
                 new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext());
+                        .containerNode(TestModel.TEST_QNAME));
 
         MutableCompositeModification compositeModification =
                 new MutableCompositeModification();
@@ -56,28 +55,20 @@ public class CompositeModificationByteStringPayloadTest {
     public void testAppendEntries(){
         List<ReplicatedLogEntry> entries = new ArrayList<>();
 
-        CompositeModificationByteStringPayload payload = newByteStringPayload(
-                new WriteModification(TestModel.OUTER_LIST_PATH,
-                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                        SCHEMA_CONTEXT));
+        WriteModification writeModification = new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-        payload.clearModificationReference();
-
-        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
 
+        compositeModification.addModification(writeModification);
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
-    }
-
+        CompositeModificationByteStringPayload payload =
+                new CompositeModificationByteStringPayload(compositeModification.toSerializable());
 
+        payload.clearModificationReference();
 
-    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
+        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
     }
-
 }
index cd741672592928553fa8c463d8de7954bd2f8d78..a55f6b865d127b9f6f4abe33cd9cbb227ab2839f 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
@@ -20,19 +18,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+@Deprecated
 public class CompositeModificationPayloadTest {
 
-
-    private static final String SERIALIZE_OUT = "serialize.out";
-
-    @After
-    public void shutDown(){
-        File f = new File(SERIALIZE_OUT);
-        if(f.exists()){
-            f.delete();
-        }
-    }
-
     @Test
     public void testBasic() throws IOException {
 
@@ -42,8 +30,7 @@ public class CompositeModificationPayloadTest {
             @Override public Payload getData() {
                 WriteModification writeModification =
                     new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext());
+                        .containerNode(TestModel.TEST_QNAME));
 
                 MutableCompositeModification compositeModification =
                     new MutableCompositeModification();
@@ -73,11 +60,12 @@ public class CompositeModificationPayloadTest {
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
 
-        o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        o.writeDelimitedTo(bos);
 
         AppendEntriesMessages.AppendEntries appendEntries2 =
             AppendEntriesMessages.AppendEntries
-                .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+                .parseDelimitedFrom(new ByteArrayInputStream(bos.toByteArray()));
 
         AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
 
@@ -85,7 +73,5 @@ public class CompositeModificationPayloadTest {
 
 
         Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
-
     }
-
 }
index 42f30437c9061b916169365e2cfa506a65e8de9a..94b9698abf3a06f41527a83e5cec2461b2658266 100644 (file)
@@ -17,7 +17,9 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
+import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -43,6 +45,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -60,12 +63,14 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -78,7 +83,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -364,13 +368,41 @@ public class ShardTest extends AbstractActorTest {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = readStore(store, root);
+
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                SerializationUtils.serializeNormalizedNode(expected),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+
+        shard.underlyingActor().onReceiveCommand(applySnapshot);
+
+        NormalizedNode<?,?> actual = readStore(shard, root);
+
+        assertEquals("Root node", expected, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testApplyHelium2VersionSnapshot() throws Exception {
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                "testApplySnapshot");
+
+        NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(shard, root);
+        NormalizedNode<?,?> expected = readStore(store, root);
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
@@ -382,7 +414,7 @@ public class ShardTest extends AbstractActorTest {
 
         NormalizedNode<?,?> actual = readStore(shard, root);
 
-        assertEquals(expected, actual);
+        assertEquals("Root node", expected, actual);
 
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
@@ -394,11 +426,26 @@ public class ShardTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
-        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
-        ApplyState applyState = new ApplyState(null, "test",
-                new ReplicatedLogImplEntry(1, 2, payload));
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testApplyStateLegacy() throws Exception {
+
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
         shard.underlyingActor().onReceiveCommand(applyState);
 
@@ -408,7 +455,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @SuppressWarnings("serial")
     @Test
     public void testRecovery() throws Exception {
 
@@ -417,61 +463,102 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
-        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
-        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
-        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
-                        root).
+                SerializationUtils.serializeNormalizedNode(root),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
+        for(int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newModificationPayload(mod)));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyLogEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @Test
+    public void testHelium2VersionRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
+
+        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
                                 getNormalizedNode().toByteString().toByteArray(),
                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                          SCHEMA_CONTEXT))));
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries-5; i++) {
+        int i = 1;
+
+        // Add some CompositeModificationPayload entries
+        for(; i <= 8; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
-                    SCHEMA_CONTEXT);
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newPayload(mod)));
+                    newLegacyPayload(mod)));
         }
 
-        // Add some of the new CompositeModificationByteStringPayload
-        for(int i = 11; i <= nListEntries; i++) {
+        // Add some CompositeModificationByteStringPayload entries
+        for(; i <= nListEntries; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
-                    SCHEMA_CONTEXT);
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newByteStringPayload(mod)));
+                    newLegacyByteStringPayload(mod)));
         }
 
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+        testRecovery(listEntryKeys);
+    }
 
+    private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
         // Create the actor and wait for recovery complete.
 
+        int nListEntries = listEntryKeys.size();
+
         final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
+        @SuppressWarnings("serial")
         Creator<Shard> creator = new Creator<Shard>() {
             @Override
             public Shard create() throws Exception {
@@ -527,7 +614,7 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private CompositeModificationPayload newPayload(final Modification... mods) {
+    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -536,7 +623,7 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
-    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -545,6 +632,14 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationByteStringPayload(compMod.toSerializable());
     }
 
+    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new ModificationPayload(compMod);
+    }
 
     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
@@ -594,7 +689,7 @@ public class ShardTest extends AbstractActorTest {
             }
         }).when(cohort).abort();
 
-        modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
+        modification.addModification(new WriteModification(path, data));
 
         return cohort;
     }
@@ -1286,19 +1381,54 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
-            testCreateSnapshot(true, "testCreateSnapshot");
+    public void testCreateSnapshot() throws Exception {
+        testCreateSnapshot(true, "testCreateSnapshot");
     }
 
     @Test
-    public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+    public void testCreateSnapshotWithNonPersistentData() throws Exception {
         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
     }
 
     @SuppressWarnings("serial")
-    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
-        final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
-                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+            DataPersistenceProvider delegate;
+
+            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+                this.delegate = delegate;
+            }
+
+            @Override
+            public boolean isRecoveryApplicable() {
+                return delegate.isRecoveryApplicable();
+            }
+
+            @Override
+            public <T> void persist(T o, Procedure<T> procedure) {
+                delegate.persist(o, procedure);
+            }
+
+            @Override
+            public void saveSnapshot(Object o) {
+                savedSnapshot.set(o);
+                delegate.saveSnapshot(o);
+            }
+
+            @Override
+            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+                delegate.deleteSnapshots(criteria);
+            }
+
+            @Override
+            public void deleteMessages(long sequenceNumber) {
+                delegate.deleteMessages(sequenceNumber);
+            }
+        }
+
+        dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
@@ -1307,6 +1437,18 @@ public class ShardTest extends AbstractActorTest {
                 public Shard create() throws Exception {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
+
+                        DelegatingPersistentDataProvider delegating;
+
+                        @Override
+                        protected DataPersistenceProvider persistence() {
+                            if(delegating == null) {
+                                delegating = new DelegatingPersistentDataProvider(super.persistence());
+                            }
+
+                            return delegating;
+                        }
+
                         @Override
                         protected void commitSnapshot(final long sequenceNumber) {
                             super.commitSnapshot(sequenceNumber);
@@ -1321,16 +1463,40 @@ public class ShardTest extends AbstractActorTest {
 
             waitUntilLeader(shard);
 
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
+
+            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
             latch.set(new CountDownLatch(1));
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            savedSnapshot.set(null);
+
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
+
+        private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
+
+            NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+            assertEquals("Root node", expectedRoot, actual);
+
         }};
     }
 
@@ -1437,7 +1603,12 @@ public class ShardTest extends AbstractActorTest {
 
     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
+        return readStore(shard.underlyingActor().getDataStore(), id);
+    }
+
+    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
             transaction.read(id);
@@ -1450,9 +1621,14 @@ public class ShardTest extends AbstractActorTest {
         return node;
     }
 
-    private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
-        throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
+    static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        writeToStore(shard.underlyingActor().getDataStore(), id, node);
+    }
+
+    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
         transaction.write(id, node);
 
index efae106617c2a656ce560aa14de53e1d8c609c9f..69dd706f37cb3bd8c1e7c75d91ec05e2b0fe1f13 100644 (file)
@@ -12,7 +12,7 @@ import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,17 +40,19 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
 public class ShardTransactionTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
 
@@ -61,8 +64,11 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
-    @BeforeClass
-    public static void staticSetup() {
+    private final InMemoryDOMDataStore store =
+            new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+
+    @Before
+    public void setup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
@@ -71,21 +77,33 @@ public class ShardTransactionTest extends AbstractActorTest {
             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
+    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
+        return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
+        return newTransactionActor(transaction, null, name, version);
+    }
+
+    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
+        return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
+            short version) {
+        Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
+                testSchemaContext, datastoreContext, shardStats, "txn", version);
+        return getSystem().actorOf(props, name);
+    }
+
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
-            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
 
-            testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
+            testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
 
-            props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-
-            testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
+            testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
@@ -111,19 +129,12 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
-            Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-
-            testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
-                    props, "testReadDataWhenDataNotFoundRO"));
 
-            props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
+            testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
+                    store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
 
-            testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
-                    props, "testReadDataWhenDataNotFoundRW"));
+            testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
+                    store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@@ -147,12 +158,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataHeliumR1() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.HELIUM_1_VERSION);
-
-            ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+            ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+                    "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
 
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                     getRef());
@@ -168,17 +175,12 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
-            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-
-            testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
-            props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
+            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
+                    "testDataExistsPositiveRO"));
 
-            testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
+            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
+                    "testDataExistsPositiveRW"));
         }
 
         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
@@ -203,17 +205,12 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveDataExistsNegative() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
-            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
 
-            testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
+            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
+                    "testDataExistsNegativeRO"));
 
-            props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-
-            testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
+            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
+                    "testDataExistsNegativeRW"));
         }
 
         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
@@ -249,11 +246,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveWriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
@@ -275,11 +269,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveHeliumR1WriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.HELIUM_1_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -298,11 +289,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
+            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+                    "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
@@ -324,11 +312,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveHeliumR1MergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.HELIUM_1_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -347,11 +332,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
                     DataStoreVersions.HELIUM_2_VERSION), getRef());
@@ -371,11 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
+            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+                    "testReadyTransaction");
 
             watch(transaction);
 
@@ -389,11 +368,8 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         // test
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+                    "testReadyTransaction2");
 
             watch(transaction);
 
@@ -404,18 +380,42 @@ public class ShardTransactionTest extends AbstractActorTest {
             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
                     Terminated.class);
         }};
+    }
 
+    @Test
+    public void testOnReceiveCreateSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ShardTest.writeToStore(store, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
+                    YangInstanceIdentifier.builder().build());
+
+            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+                    "testOnReceiveCreateSnapshot");
+
+            watch(transaction);
+
+            transaction.tell(CreateSnapshot.INSTANCE, getRef());
+
+            CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
+
+            assertNotNull("getSnapshot is null", reply.getSnapshot());
+
+            NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
+                    reply.getSnapshot());
+
+            assertEquals("Root node", expectedRoot, actualRoot);
+
+            expectTerminated(duration("3 seconds"), transaction);
+        }};
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
+            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+                    "testCloseTransaction");
 
             watch(transaction);
 
@@ -445,12 +445,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
 
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = createShard();
-            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn",
-                    DataStoreVersions.CURRENT_VERSION);
-            final ActorRef transaction =
-                getSystem().actorOf(props, "testShardTransactionInactivity");
+            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+                    "testShardTransactionInactivity");
 
             watch(transaction);
 
index b33f902929a470eae42bb51f19718a5846d7c5e4..9daaa0da9739640d35cfac49fe7323d27378735c 100644 (file)
@@ -1,35 +1,49 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-public class DeleteModificationTest extends AbstractModificationTest{
-
-  @Test
-  public void testApply() throws Exception {
-    //Write something into the datastore
-    DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
-    WriteModification writeModification = new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
-    writeModification.apply(writeTransaction);
-    commitTransaction(writeTransaction);
-
-    //Check if it's in the datastore
-    Optional<NormalizedNode<?,?>> data = readData(TestModel.TEST_PATH);
-    Assert.assertTrue(data.isPresent());
-
-    //Delete stuff from the datastore
-    DOMStoreWriteTransaction deleteTransaction = store.newWriteOnlyTransaction();
-    DeleteModification deleteModification = new DeleteModification(TestModel.TEST_PATH);
-    deleteModification.apply(deleteTransaction);
-    commitTransaction(deleteTransaction);
-
-    data = readData(TestModel.TEST_PATH);
-    Assert.assertFalse(data.isPresent());
-  }
+public class DeleteModificationTest extends AbstractModificationTest {
+
+    @Test
+    public void testApply() throws Exception {
+        // Write something into the datastore
+        DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
+        WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeModification.apply(writeTransaction);
+        commitTransaction(writeTransaction);
+
+        // Check if it's in the datastore
+        Optional<NormalizedNode<?, ?>> data = readData(TestModel.TEST_PATH);
+        Assert.assertTrue(data.isPresent());
+
+        // Delete stuff from the datastore
+        DOMStoreWriteTransaction deleteTransaction = store.newWriteOnlyTransaction();
+        DeleteModification deleteModification = new DeleteModification(TestModel.TEST_PATH);
+        deleteModification.apply(deleteTransaction);
+        commitTransaction(deleteTransaction);
+
+        data = readData(TestModel.TEST_PATH);
+        Assert.assertFalse(data.isPresent());
+    }
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+        DeleteModification expected = new DeleteModification(path);
+
+        DeleteModification clone = (DeleteModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+    }
 }
index 5d2021167b52564c76a636fc29dc8c94e2d80e66..a69d9388a7b1e25f8d19f22ba426785aeae8e375 100644 (file)
@@ -1,13 +1,16 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MergeModificationTest extends AbstractModificationTest{
 
@@ -17,7 +20,8 @@ public class MergeModificationTest extends AbstractModificationTest{
 
         //Write something into the datastore
         DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
-        MergeModification writeModification = new MergeModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
+        MergeModification writeModification = new MergeModification(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
         writeModification.apply(writeTransaction);
         commitTransaction(writeTransaction);
 
@@ -29,16 +33,15 @@ public class MergeModificationTest extends AbstractModificationTest{
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        MergeModification mergeModification = new MergeModification(TestModel.TEST_PATH,
-                node, schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        Object serialized = mergeModification.toSerializable();
+        MergeModification expected = new MergeModification(path, data);
 
-        MergeModification newModification = MergeModification.fromSerializable(serialized, schemaContext);
-
-        Assert.assertEquals("getPath", TestModel.TEST_PATH, newModification.getPath());
-        Assert.assertEquals("getData", node, newModification.getData());
+        MergeModification clone = (MergeModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+        assertEquals("getData", expected.getData(), clone.getData());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java
new file mode 100644 (file)
index 0000000..bbfff70
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.modification;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ModificationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ModificationPayloadTest {
+
+    @Test
+    public void test() throws Exception {
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        compositeModification.addModification(new WriteModification(writePath, writeData));
+
+        ModificationPayload payload = new ModificationPayload(compositeModification);
+
+        MutableCompositeModification deserialized = (MutableCompositeModification) payload.getModification();
+
+        assertEquals("getModifications size", 1, deserialized.getModifications().size());
+        WriteModification write = (WriteModification)deserialized.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        ModificationPayload cloned =
+                (ModificationPayload) SerializationUtils.clone(payload);
+
+        deserialized = (MutableCompositeModification) payload.getModification();
+
+        assertEquals("getModifications size", 1, deserialized.getModifications().size());
+        write = (WriteModification)deserialized.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+    }
+}
index f8116aa78d72f4b08597cee693d592ca8c32ed87..8ae2a8657d2ec05abd439c6f60376d28b4516114 100644 (file)
@@ -1,15 +1,18 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MutableCompositeModificationTest extends AbstractModificationTest {
 
@@ -18,7 +21,7 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
 
         MutableCompositeModification compositeModification = new MutableCompositeModification();
         compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
 
         DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction();
         compositeModification.apply(transaction);
@@ -31,13 +34,68 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
     }
 
     @Test
-    public void testEverySerializedCompositeModificationObjectMustBeDifferent(){
+    public void testSerialization() {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
         MutableCompositeModification compositeModification = new MutableCompositeModification();
-        compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
-        Object one = compositeModification.toSerializable();
-        try{Thread.sleep(10);}catch(Exception err){}
-        Object two = compositeModification.toSerializable();
-        assertNotEquals(one,two);
+        compositeModification.addModification(new WriteModification(writePath, writeData));
+        compositeModification.addModification(new MergeModification(mergePath, mergeData));
+        compositeModification.addModification(new DeleteModification(deletePath));
+
+        MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
+
+        assertEquals("getModifications size", 3, clone.getModifications().size());
+
+        WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getPath", mergePath, merge.getPath());
+        assertEquals("getData", mergeData, merge.getData());
+
+        DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getPath", deletePath, delete.getPath());
+    }
+
+    @Test
+    @Ignore
+    public void testSerializationScale() throws Exception {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        for(int i = 0; i < 1000; i++) {
+            compositeModification.addModification(new WriteModification(writePath, writeData));
+        }
+
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        for(int i = 0; i < 1000; i++) {
+            new ModificationPayload(compositeModification);
+        }
+
+        sw.stop();
+        System.out.println("Elapsed: "+sw);
+
+        ModificationPayload p = new ModificationPayload(compositeModification);
+        sw.start();
+        for(int i = 0; i < 1000; i++) {
+            p.getModification();
+        }
+
+        sw.stop();
+        System.out.println("Elapsed: "+sw);
     }
 }
index 3a82fffccb16ffe9e02b3e85c2106213e749dce1..2e9ce224b7e482edf32cefd5b4f590afef04d4c7 100644 (file)
@@ -1,22 +1,25 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
-public class WriteModificationTest extends AbstractModificationTest{
+public class WriteModificationTest extends AbstractModificationTest {
 
     @Test
     public void testApply() throws Exception {
         //Write something into the datastore
         DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
         WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
         writeModification.apply(writeTransaction);
         commitTransaction(writeTransaction);
 
@@ -27,16 +30,15 @@ public class WriteModificationTest extends AbstractModificationTest{
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
-                node, schemaContext);
-
-        Object serialized = writeModification.toSerializable();
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteModification newModification = WriteModification.fromSerializable(serialized, schemaContext);
+        WriteModification expected = new WriteModification(path, data);
 
-        Assert.assertEquals("getPath", TestModel.TEST_PATH, newModification.getPath());
-        Assert.assertEquals("getData", node, newModification.getData());
+        WriteModification clone = (WriteModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+        assertEquals("getData", expected.getData(), clone.getData());
     }
 }
index a3041e89dbf9ac2a9139552c6cafff7437bc0390..79c1bb4720e9790599b99bb17c6cf6d823275b86 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -72,16 +73,14 @@ public class Client {
             @Override public Payload getData() {
                 WriteModification writeModification =
                     new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext()
-                    );
+                        .containerNode(TestModel.TEST_QNAME));
 
                 MutableCompositeModification compositeModification =
                     new MutableCompositeModification();
 
                 compositeModification.addModification(writeModification);
 
-                return new CompositeModificationPayload(
+                return new CompositeModificationByteStringPayload(
                     compositeModification.toSerializable());
             }