--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+ Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ByteString byteString;
+ private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+ public CompositeModificationByteStringPayload(){
+ byteString = null;
+ }
+ public CompositeModificationByteStringPayload(Object modification){
+ this(((PersistentMessages.CompositeModification) modification).toByteString());
+ this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+ }
+
+ private CompositeModificationByteStringPayload(ByteString byteString){
+ this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+ }
+
+
+ @Override
+ public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+ Preconditions.checkState(byteString!=null);
+ Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+ map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+ getModificationInternal());
+ return map;
+ }
+
+ @Override
+ public Payload decode(
+ AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+ PersistentMessages.CompositeModification modification = payload
+ .getExtension(
+ org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+ // The extension was put in the unknown field.
+ // This is because extensions need to be registered
+ // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+ // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+ // If that is not done then on the other end the extension shows up as an unknown field
+ // Need to figure out a better way to do this
+ if(payload.getUnknownFields().hasField(2)){
+ UnknownFieldSet.Field field =
+ payload.getUnknownFields().getField(2);
+
+ return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+ }
+
+ return new CompositeModificationByteStringPayload(modification);
+ }
+
+ public Object getModification(){
+ return getModificationInternal();
+ }
+
+ private PersistentMessages.CompositeModification getModificationInternal(){
+ if(this.modificationReference != null && this.modificationReference.get() != null){
+ return this.modificationReference.get();
+ }
+ try {
+ PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+ this.modificationReference = new SoftReference<>(compositeModification);
+ return compositeModification;
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+ }
+
+ return null;
+ }
+
+ public int size(){
+ return byteString.size();
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream)
+ throws IOException {
+ byteString.writeTo(stream);
+ }
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ byteString = ByteString.readFrom(stream);
+ }
+
+ @VisibleForTesting
+ public void clearModificationReference(){
+ if(this.modificationReference != null) {
+ this.modificationReference.clear();
+ }
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
cohortEntry.getCohort().preCommit().get();
Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+ new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
} catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
protected void appendRecoveredLogEntry(final Payload data) {
if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else if (data instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
LOG.error("Unknown state received {} during recovery", data);
}
if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
- if(modification == null) {
- LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
- } else if(clientActor == null) {
- // There's no clientActor to which to send a commit reply so we must be applying
- // replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(
- modification, schemaContext));
- } else {
- // This must be the OK to commit after replication consensus.
- finishCommit(clientActor, identifier);
- }
+ applyModificationToState(clientActor, identifier, modification);
+ } else if(data instanceof CompositeModificationByteStringPayload ){
+ Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+ applyModificationToState(clientActor, identifier, modification);
+
} else {
LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
data, data.getClass().getClassLoader(),
}
+ private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ if(modification == null) {
+ LOG.error(
+ "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ identifier, clientActor != null ? clientActor.path().toString() : null);
+ } else if(clientActor == null) {
+ // There's no clientActor to which to send a commit reply so we must be applying
+ // replicated state from the leader.
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+ modification, schemaContext));
+ } else {
+ // This must be the OK to commit after replication consensus.
+ finishCommit(clientActor, identifier);
+ }
+ }
+
private void updateJournalStats() {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+ private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+ @Test
+ public void testSerialization(){
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext());
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ CompositeModificationByteStringPayload compositeModificationByteStringPayload
+ = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+ byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+ Object deserialize = SerializationUtils.deserialize(bytes);
+
+ assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+ }
+
+ @Test
+ public void testAppendEntries(){
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+ CompositeModificationByteStringPayload payload = newByteStringPayload(
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ SCHEMA_CONTEXT));
+
+ payload.clearModificationReference();
+
+ entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ }
+
+
+
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+}
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
SCHEMA_CONTEXT))));
- int nListEntries = 11;
+ int nListEntries = 16;
Set<Integer> listEntryKeys = new HashSet<>();
- for(int i = 1; i <= nListEntries; i++) {
+ for(int i = 1; i <= nListEntries-5; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
newPayload(mod)));
}
+ // Add some of the new CompositeModificationByteStringPayload
+ for(int i = 11; i <= nListEntries; i++) {
+ listEntryKeys.add(Integer.valueOf(i));
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+ Modification mod = new MergeModification(path,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+ SCHEMA_CONTEXT);
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ newByteStringPayload(mod)));
+ }
+
+
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
new ApplyLogEntries(nListEntries));
return new CompositeModificationPayload(compMod.toSerializable());
}
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+
private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification) {