Bug 8444 - Persistent prefix-based shard cannot load its snapshot
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStoreTest.java
1 /*
2  * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.persistence;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.junit.Assert.assertEquals;
13 import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.SNAPSHOT_DIR;
14 import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.cleanSnapshotDir;
15 import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.createSnapshotDir;
16
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSystem;
19 import akka.actor.ExtendedActorSystem;
20 import akka.persistence.Persistence;
21 import akka.persistence.SelectedSnapshot;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotProtocol;
24 import akka.persistence.SnapshotProtocol.LoadSnapshot;
25 import akka.persistence.SnapshotProtocol.LoadSnapshotFailed;
26 import akka.persistence.SnapshotProtocol.LoadSnapshotResult;
27 import akka.persistence.SnapshotSelectionCriteria;
28 import akka.persistence.serialization.Snapshot;
29 import akka.persistence.serialization.SnapshotSerializer;
30 import akka.testkit.JavaTestKit;
31 import com.google.common.base.Throwables;
32 import com.typesafe.config.ConfigFactory;
33 import java.io.File;
34 import java.io.FileOutputStream;
35 import java.io.IOException;
36 import java.io.UnsupportedEncodingException;
37 import java.net.URLEncoder;
38 import java.nio.charset.StandardCharsets;
39 import org.apache.commons.io.FileUtils;
40 import org.apache.commons.lang.SerializationUtils;
41 import org.junit.After;
42 import org.junit.AfterClass;
43 import org.junit.Before;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.mockito.MockitoAnnotations;
47 import scala.Option;
48
49 /**
50  * Unit tests for LocalSnapshotStore. These are in addition to LocalSnapshotStoreSpecTest to cover a few cases
51  * that SnapshotStoreSpec doesn't.
52  *
53  * @author Thomas Pantelis
54  */
55 public class LocalSnapshotStoreTest {
56     private static final String PERSISTENCE_ID = "member-1-shard-default-config";
57     private static final String PREFIX_BASED_SHARD_PERSISTENCE_ID = "member-1-shard-id-ints!-config";
58
59     private static ActorSystem system;
60     private static ActorRef snapshotStore;
61
62     @BeforeClass
63     public static void staticSetup() {
64         createSnapshotDir();
65
66         system = ActorSystem.create("test", ConfigFactory.load("LocalSnapshotStoreTest.conf"));
67         snapshotStore = system.registerExtension(Persistence.lookup()).snapshotStoreFor(null);
68     }
69
70     @AfterClass
71     public static void staticCleanup() {
72         FileUtils.deleteQuietly(SNAPSHOT_DIR);
73         JavaTestKit.shutdownActorSystem(system);
74     }
75
76     @Before
77     public void setup() {
78         MockitoAnnotations.initMocks(this);
79         cleanSnapshotDir();
80     }
81
82     @After
83     public void cleanup() {
84         cleanSnapshotDir();
85     }
86
87     @Test
88     public void testDoLoadAsync() throws IOException {
89         createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000);
90         createSnapshotFile(PERSISTENCE_ID, "two", 1, 2000);
91         createSnapshotFile(PERSISTENCE_ID, "three", 1, 3000);
92
93         createSnapshotFile(PREFIX_BASED_SHARD_PERSISTENCE_ID, "foo", 0, 1000);
94         createSnapshotFile(PREFIX_BASED_SHARD_PERSISTENCE_ID, "bar", 1, 2000);
95         createSnapshotFile(PREFIX_BASED_SHARD_PERSISTENCE_ID, "foobar", 1, 3000);
96
97         createSnapshotFile("member-1-shard-default-oper", "foo", 0, 1000);
98         createSnapshotFile("member-1-shard-toaster-oper", "foo", 0, 1000);
99         new File(SNAPSHOT_DIR, "other").createNewFile();
100         new File(SNAPSHOT_DIR, "other-1485349217290").createNewFile();
101
102         SnapshotMetadata metadata3 = new SnapshotMetadata(PERSISTENCE_ID, 1, 3000);
103
104         JavaTestKit probe = new JavaTestKit(system);
105         snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
106                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
107         LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
108         Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
109
110         assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
111         assertEquals("SelectedSnapshot metadata", metadata3, possibleSnapshot.get().metadata());
112         assertEquals("SelectedSnapshot snapshot", "three", possibleSnapshot.get().snapshot());
113
114         snapshotStore.tell(new LoadSnapshot(PREFIX_BASED_SHARD_PERSISTENCE_ID,
115                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
116         result = probe.expectMsgClass(LoadSnapshotResult.class);
117         possibleSnapshot = result.snapshot();
118
119         SnapshotMetadata prefixBasedShardMetada3 = new SnapshotMetadata(PREFIX_BASED_SHARD_PERSISTENCE_ID, 1, 3000);
120
121         assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
122         assertEquals("SelectedSnapshot metadata", prefixBasedShardMetada3, possibleSnapshot.get().metadata());
123         assertEquals("SelectedSnapshot snapshot", "foobar", possibleSnapshot.get().snapshot());
124     }
125
126     @Test
127     public void testDoLoadAsyncWithNoSnapshots() throws IOException {
128         JavaTestKit probe = new JavaTestKit(system);
129         snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
130                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
131         LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
132         Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
133
134         assertEquals("SelectedSnapshot present", FALSE, possibleSnapshot.nonEmpty());
135     }
136
137     @Test
138     public void testDoLoadAsyncWithRetry() throws IOException  {
139         createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000);
140         createSnapshotFile(PERSISTENCE_ID, null, 1, 2000);
141
142         SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 0, 1000);
143
144         JavaTestKit probe = new JavaTestKit(system);
145         snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
146                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
147         LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
148         Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
149
150         assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
151         assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata());
152         assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
153     }
154
155     @Test(expected = IOException.class)
156     public void testDoLoadAsyncWithFailure() throws IOException {
157         createSnapshotFile(PERSISTENCE_ID, null, 1, 2000);
158
159         JavaTestKit probe = new JavaTestKit(system);
160         snapshotStore.tell(new SnapshotProtocol.LoadSnapshot(PERSISTENCE_ID,
161                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
162         LoadSnapshotFailed failed = probe.expectMsgClass(LoadSnapshotFailed.class);
163         Throwables.propagateIfInstanceOf(failed.cause(), IOException.class);
164     }
165
166     @Test
167     public void testDoLoadAsyncWithAkkaSerializedSnapshot() throws IOException {
168         SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) system);
169
170         String name = toSnapshotName(PERSISTENCE_ID, 1, 1000);
171         try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) {
172             fos.write(snapshotSerializer.toBinary(new Snapshot("one")));
173         }
174
175         SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 1, 1000);
176
177         JavaTestKit probe = new JavaTestKit(system);
178         snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
179                 SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
180         LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
181         Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
182
183         assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
184         assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata());
185         assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
186     }
187
188     private void createSnapshotFile(String persistenceId, String payload, int seqNr, int timestamp) throws IOException {
189         String name = toSnapshotName(persistenceId, seqNr, timestamp);
190         try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) {
191             if (payload != null) {
192                 fos.write(SerializationUtils.serialize(payload));
193             }
194         }
195     }
196
197     private static String toSnapshotName(String persistenceId, int seqNr, int timestamp)
198             throws UnsupportedEncodingException {
199         final String encodedPersistenceId = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8.name());
200         return "snapshot-" + encodedPersistenceId + "-" + seqNr + "-" + timestamp;
201     }
202 }