*/
package org.opendaylight.controller.cluster.datastore.messages;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ExtendedActorSystem;
import akka.serialization.JSerializer;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import javax.annotation.Nonnull;
+import akka.util.ClassLoaderObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
/**
* Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
* shards.
*/
public final class ReadyLocalTransactionSerializer extends JSerializer {
+
+ private final ExtendedActorSystem system;
+
+ public ReadyLocalTransactionSerializer(final ExtendedActorSystem system) {
+ this.system = requireNonNull(system);
+ }
+
@Override
public int identifier() {
return 97439437;
@Override
public byte[] toBinary(final Object obj) {
- Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
- final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj;
- final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
- DataStoreVersions.CURRENT_VERSION, "");
- batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+ checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
+ final ReadyLocalTransaction readyLocal = (ReadyLocalTransaction) obj;
+ final BatchedModifications batched = new BatchedModifications(readyLocal.getTransactionId(),
+ readyLocal.getRemoteVersion());
+ batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
batched.setTotalMessagesSent(1);
- batched.setReady(true);
+ batched.setReady(readyLocal.getParticipatingShardNames());
- msg.getModification().applyToCursor(new BatchedCursor(batched));
+ readyLocal.getModification().applyToCursor(new BatchedCursor(batched));
return SerializationUtils.serialize(batched);
}
@Override
public Object fromBinaryJava(final byte[] bytes, final Class<?> clazz) {
- return SerializationUtils.deserialize(bytes);
+ try (ClassLoaderObjectInputStream is = new ClassLoaderObjectInputStream(system.dynamicAccess().classLoader(),
+ new ByteArrayInputStream(bytes))) {
+ return is.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IllegalStateException("Failed to deserialize object", e);
+ }
}
- private static final class BatchedCursor implements DataTreeModificationCursor {
- private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+ private static final class BatchedCursor extends AbstractBatchedModificationsCursor {
private final BatchedModifications message;
BatchedCursor(final BatchedModifications message) {
- this.message = Preconditions.checkNotNull(message);
- stack.push(YangInstanceIdentifier.EMPTY);
- }
-
- @Override
- public void delete(final PathArgument child) {
- message.addModification(new DeleteModification(stack.peek().node(child)));
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- message.addModification(new MergeModification(stack.peek().node(child), data));
- }
-
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- message.addModification(new WriteModification(stack.peek().node(child), data));
- }
-
- @Override
- public void enter(@Nonnull final PathArgument child) {
- stack.push(stack.peek().node(child));
- }
-
- @Override
- public void enter(@Nonnull final PathArgument... path) {
- for (PathArgument arg : path) {
- enter(arg);
- }
- }
-
- @Override
- public void enter(@Nonnull final Iterable<PathArgument> path) {
- for (PathArgument arg : path) {
- enter(arg);
- }
- }
-
- @Override
- public void exit() {
- stack.pop();
- }
-
- @Override
- public void exit(final int depth) {
- Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
- for (int i = 0; i < depth; ++i) {
- stack.pop();
- }
- }
-
- @Override
- public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
- throw new UnsupportedOperationException("Not implemented");
+ this.message = requireNonNull(message);
}
@Override
- public void close() {
- // No-op
+ protected BatchedModifications getModifications() {
+ return message;
}
}
}