5f75e495af5b7db6f7f4ab4da2fab5cf3ee3d19a
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalNotification;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.io.Serializable;
19 import java.util.Iterator;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.function.Predicate;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
26 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
27 import org.opendaylight.yangtools.concepts.Identifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
33  *
34  * @author Thomas Pantelis
35  * @see MessageAssembler
36  */
37 public class MessageSlicer implements AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
39     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
40     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
41
42     private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
43     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
44     private final int messageSliceSize;
45     private final int maxSlicingTries;
46     private final String logContext;
47     private final long id;
48
49     MessageSlicer(final Builder builder) {
50         this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
51         this.messageSliceSize = builder.messageSliceSize;
52         this.maxSlicingTries = builder.maxSlicingTries;
53
54         id = SLICER_ID_COUNTER.getAndIncrement();
55         this.logContext = builder.logContext + "_slicer-id-" + id;
56
57         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
58                 CacheBuilder.newBuilder().removalListener(notification -> stateRemoved(notification));
59         if (builder.expireStateAfterInactivityDuration > 0) {
60             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
61                     builder.expireStateAfterInactivityUnit);
62         }
63         stateCache = cacheBuilder.build();
64     }
65
66     @VisibleForTesting
67     long getId() {
68         return id;
69     }
70
71     /**
72      * Returns a new Builder for creating MessageSlicer instances.
73      *
74      * @return a Builder instance
75      */
76     public static Builder builder() {
77         return new Builder();
78     }
79
80     /**
81      * Checks if the given message is handled by this class. If so, it should be forwarded to the
82      * {@link #handleMessage(Object)} method
83      *
84      * @param message the message to check
85      * @return true if handled, false otherwise
86      */
87     public static boolean isHandledMessage(final Object message) {
88         return message instanceof MessageSliceReply;
89     }
90
91     /**
92      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
93      * options.
94      *
95      * @param options the SliceOptions
96      * @return true if the message was sliced, false otherwise
97      */
98     public boolean slice(final SliceOptions options) {
99         final Identifier identifier = options.getIdentifier();
100         final Serializable message = options.getMessage();
101         final FileBackedOutputStream fileBackedStream;
102         if (message != null) {
103             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
104
105             Preconditions.checkNotNull(fileBackedStreamFactory,
106                     "The FiledBackedStreamFactory must be set in order to call this slice method");
107
108             // Serialize the message to a FileBackedOutputStream.
109             fileBackedStream = fileBackedStreamFactory.newInstance();
110             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
111                 out.writeObject(message);
112             } catch (IOException e) {
113                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
114                 fileBackedStream.cleanup();
115                 options.getOnFailureCallback().accept(e);
116                 return false;
117             }
118         } else {
119             fileBackedStream = options.getFileBackedStream();
120         }
121
122         return initializeSlicing(options, fileBackedStream);
123     }
124
125     private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
126         final Identifier identifier = options.getIdentifier();
127         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
128         SlicedMessageState<ActorRef> state = null;
129         try {
130             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
131                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
132
133             final Serializable message = options.getMessage();
134             if (state.getTotalSlices() == 1 && message != null) {
135                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
136                 state.close();
137                 sendTo(options, message, options.getReplyTo());
138                 return false;
139             }
140
141             final MessageSlice firstSlice = getNextSliceMessage(state);
142
143             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
144
145             stateCache.put(messageSliceId, state);
146             sendTo(options, firstSlice, ActorRef.noSender());
147             return true;
148         } catch (IOException e) {
149             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
150             if (state != null) {
151                 state.close();
152             } else {
153                 fileBackedStream.cleanup();
154             }
155
156             options.getOnFailureCallback().accept(e);
157             return false;
158         }
159     }
160
161     private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
162         if (options.getSendToRef() != null) {
163             options.getSendToRef().tell(message, sender);
164         } else {
165             options.getSendToSelection().tell(message, sender);
166         }
167     }
168
169     /**
170      * Invoked to handle messages pertaining to this class.
171      *
172      * @param message the message
173      * @return true if the message was handled, false otherwise
174      */
175     public boolean handleMessage(final Object message) {
176         if (message instanceof MessageSliceReply) {
177             LOG.debug("{}: handleMessage: {}", logContext, message);
178             return onMessageSliceReply((MessageSliceReply) message);
179         }
180
181         return false;
182     }
183
184     /**
185      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
186      * on the other end.
187      */
188     public void checkExpiredSlicedMessageState() {
189         if (stateCache.size() > 0) {
190             stateCache.cleanUp();
191         }
192     }
193
194     /**
195      * Closes and removes all in-progress sliced message state.
196      */
197     @Override
198     public void close() {
199         LOG.debug("{}: Closing", logContext);
200         stateCache.invalidateAll();
201     }
202
203     /**
204      * Cancels all in-progress sliced message state that matches the given filter.
205      *
206      * @param filter filters by Identifier
207      */
208     public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
209         final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
210         while (iter.hasNext()) {
211             if (filter.test(iter.next().getClientIdentifier())) {
212                 iter.remove();
213             }
214         }
215     }
216
217     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
218         final byte[] firstSliceBytes = state.getNextSlice();
219         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
220                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
221     }
222
223     private boolean onMessageSliceReply(final MessageSliceReply reply) {
224         final Identifier identifier = reply.getIdentifier();
225         if (!(identifier instanceof MessageSliceIdentifier)
226                 || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
227             return false;
228         }
229
230         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
231         if (state == null) {
232             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
233             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
234             return true;
235         }
236
237         synchronized (state) {
238             try {
239                 final Optional<MessageSliceException> failure = reply.getFailure();
240                 if (failure.isPresent()) {
241                     LOG.warn("{}: Received failed {}", logContext, reply);
242                     processMessageSliceException(failure.get(), state, reply.getSendTo());
243                     return true;
244                 }
245
246                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
247                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
248                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
249                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
250                     possiblyRetrySlicing(state, reply.getSendTo());
251                     return true;
252                 }
253
254                 if (state.isLastSlice(reply.getSliceIndex())) {
255                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
256                     removeState(identifier);
257                 } else {
258                     final MessageSlice nextSlice = getNextSliceMessage(state);
259                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
260                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
261                 }
262             } catch (IOException e) {
263                 LOG.warn("{}: Error processing {}", logContext, reply, e);
264                 fail(state, e);
265             }
266         }
267
268         return true;
269     }
270
271     private void processMessageSliceException(final MessageSliceException exception,
272             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
273         if (exception.isRetriable()) {
274             possiblyRetrySlicing(state, sendTo);
275         } else {
276             fail(state, exception.getCause() != null ? exception.getCause() : exception);
277         }
278     }
279
280     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
281             throws IOException {
282         if (state.canRetry()) {
283             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
284             state.reset();
285             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
286         } else {
287             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
288                     state.getIdentifier());
289             LOG.warn(message);
290             fail(state, new RuntimeException(message));
291         }
292     }
293
294     private void removeState(final Identifier identifier) {
295         LOG.debug("{}: Removing state for {}", logContext, identifier);
296         stateCache.invalidate(identifier);
297     }
298
299     private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
300         final SlicedMessageState<ActorRef> state = notification.getValue();
301         state.close();
302         if (notification.wasEvicted()) {
303             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
304             state.getOnFailureCallback().accept(new RuntimeException(String.format(
305                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
306                      + "component on the other end", state.getIdentifier())));
307         } else {
308             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
309                     notification.getKey(), notification.getCause());
310         }
311     }
312
313     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
314         removeState(state.getIdentifier());
315         state.getOnFailureCallback().accept(failure);
316     }
317
318     @VisibleForTesting
319     boolean hasState(final Identifier forIdentifier) {
320         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
321         stateCache.cleanUp();
322         return exists;
323     }
324
325     public static class Builder {
326         private FileBackedOutputStreamFactory fileBackedStreamFactory;
327         private int messageSliceSize = -1;
328         private long expireStateAfterInactivityDuration = -1;
329         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
330         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
331         private String logContext = "<no-context>";
332
333         /**
334          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
335          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
336          * If Serializable messages aren't passed then the factory need not be set.
337          *
338          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
339          * @return this Builder
340          */
341         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
342             this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
343             return this;
344         }
345
346         /**
347          * Sets the maximum size (in bytes) for a message slice.
348          *
349          * @param newMessageSliceSize the maximum size (in bytes)
350          * @return this Builder
351          */
352         public Builder messageSliceSize(final int newMessageSliceSize) {
353             Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
354             this.messageSliceSize = newMessageSliceSize;
355             return this;
356         }
357
358         /**
359          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
360          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
361          *
362          * @param newMaxSlicingTries the maximum number of tries
363          * @return this Builder
364          */
365         public Builder maxSlicingTries(final int newMaxSlicingTries) {
366             Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
367             this.maxSlicingTries = newMaxSlicingTries;
368             return this;
369         }
370
371         /**
372          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
373          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
374          * state is not purged due to inactivity.
375          *
376          * @param duration the length of time after which a state entry is purged
377          * @param unit the unit the duration is expressed in
378          * @return this Builder
379          */
380         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
381             Preconditions.checkArgument(duration > 0, "duration must be > 0");
382             this.expireStateAfterInactivityDuration = duration;
383             this.expireStateAfterInactivityUnit = unit;
384             return this;
385         }
386
387         /**
388          * Sets the context for log messages.
389          *
390          * @param newLogContext the log context
391          * @return this Builder
392          */
393         public Builder logContext(final String newLogContext) {
394             this.logContext = Preconditions.checkNotNull(newLogContext);
395             return this;
396         }
397
398         /**
399          * Builds a new MessageSlicer instance.
400          *
401          * @return a new MessageSlicer
402          */
403         public MessageSlicer build() {
404             return new MessageSlicer(this);
405         }
406     }
407 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.