2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import com.google.common.io.ByteSource;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.BiConsumer;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
25 import org.opendaylight.yangtools.concepts.Identifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
32 * @author Thomas Pantelis
35 public class MessageAssembler implements AutoCloseable {
36 private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
38 private final Cache<Identifier, AssembledMessageState> stateCache;
39 private final FileBackedOutputStreamFactory fileBackedStreamFactory;
40 private final BiConsumer<Object, ActorRef> assembledMessageCallback;
41 private final String logContext;
43 private MessageAssembler(Builder builder) {
44 this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
45 "FiledBackedStreamFactory cannot be null");
46 this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
47 "assembledMessageCallback cannot be null");
48 this.logContext = builder.logContext;
50 stateCache = CacheBuilder.newBuilder()
51 .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
52 .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
53 stateRemoved(notification)).build();
57 * Returns a new Builder for creating MessageAssembler instances.
59 * @return a Builder instance
61 public static Builder builder() {
66 * Checks if the given message is handled by this class. If so, it should be forwarded to the
67 * {@link #handleMessage(Object, ActorRef)} method
69 * @param message the message to check
70 * @return true if handled, false otherwise
72 public static boolean isHandledMessage(Object message) {
73 return message instanceof MessageSlice || message instanceof AbortSlicing;
78 LOG.debug("{}: Closing", logContext);
79 stateCache.invalidateAll();
83 * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
86 public void checkExpiredAssembledMessageState() {
87 if (stateCache.size() > 0) {
93 * Invoked to handle message slices and other messages pertaining to this class.
95 * @param message the message
96 * @param sendTo the reference of the actor to which subsequent message slices should be sent
97 * @return true if the message was handled, false otherwise
99 public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
100 if (message instanceof MessageSlice) {
101 LOG.debug("{}: handleMessage: {}", logContext, message);
102 onMessageSlice((MessageSlice) message, sendTo);
104 } else if (message instanceof AbortSlicing) {
105 LOG.debug("{}: handleMessage: {}", logContext, message);
106 onAbortSlicing((AbortSlicing) message);
113 private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
114 final Identifier identifier = messageSlice.getIdentifier();
116 final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
117 processMessageSliceForState(messageSlice, state, sendTo);
118 } catch (ExecutionException e) {
119 final MessageSliceException messageSliceEx;
120 final Throwable cause = e.getCause();
121 if (cause instanceof MessageSliceException) {
122 messageSliceEx = (MessageSliceException) cause;
124 messageSliceEx = new MessageSliceException(String.format(
125 "Error creating state for identifier %s", identifier), cause);
128 messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
129 ActorRef.noSender());
133 private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
134 final Identifier identifier = messageSlice.getIdentifier();
135 if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
136 LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
137 return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
138 fileBackedStreamFactory, logContext);
141 LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
142 throw new MessageSliceException(String.format(
143 "No assembled state found for identifier %s and slice index %s", identifier,
144 messageSlice.getSliceIndex()), true);
147 private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
148 final ActorRef sendTo) {
149 final Identifier identifier = messageSlice.getIdentifier();
150 final ActorRef replyTo = messageSlice.getReplyTo();
151 Object reAssembledMessage = null;
152 synchronized (state) {
153 final int sliceIndex = messageSlice.getSliceIndex();
155 final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
156 if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
157 LOG.debug("{}: Received last slice for {}", logContext, identifier);
159 reAssembledMessage = reAssembleMessage(state);
161 replyTo.tell(successReply, ActorRef.noSender());
162 removeState(identifier);
164 LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
165 replyTo.tell(successReply, ActorRef.noSender());
167 } catch (MessageSliceException e) {
168 LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
169 replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
170 removeState(identifier);
174 if (reAssembledMessage != null) {
175 LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
176 assembledMessageCallback.accept(reAssembledMessage, replyTo);
180 private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
182 final ByteSource assembledBytes = state.getAssembledBytes();
183 try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
184 return in.readObject();
187 } catch (IOException | ClassNotFoundException e) {
188 throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
189 state.getIdentifier()), e);
193 private void onAbortSlicing(AbortSlicing message) {
194 removeState(message.getIdentifier());
197 private void removeState(final Identifier identifier) {
198 LOG.debug("{}: Removing state for {}", logContext, identifier);
199 stateCache.invalidate(identifier);
202 private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
203 if (notification.wasEvicted()) {
204 LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
206 LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
207 notification.getKey(), notification.getCause());
210 notification.getValue().close();
214 boolean hasState(Identifier forIdentifier) {
215 boolean exists = stateCache.getIfPresent(forIdentifier) != null;
216 stateCache.cleanUp();
220 public static class Builder {
221 private FileBackedOutputStreamFactory fileBackedStreamFactory;
222 private BiConsumer<Object, ActorRef> assembledMessageCallback;
223 private long expireStateAfterInactivityDuration = 1;
224 private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
225 private String logContext = "<no-context>";
228 * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
230 * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
231 * @return this Builder
233 public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
234 this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
239 * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
240 * original sender ActorRef as arguments.
242 * @param newAssembledMessageCallback the Consumer callback
243 * @return this Builder
245 public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
246 this.assembledMessageCallback = newAssembledMessageCallback;
251 * Sets the duration and time unit whereby assembled message state is purged from the cache due to
252 * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
255 * @param duration the length of time after which a state entry is purged
256 * @param unit the unit the duration is expressed in
257 * @return this Builder
259 public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
260 Preconditions.checkArgument(duration > 0, "duration must be > 0");
261 this.expireStateAfterInactivityDuration = duration;
262 this.expireStateAfterInactivityUnit = unit;
267 * Sets the context for log messages.
269 * @param newLogContext the log context
270 * @return this Builder
272 public Builder logContext(final String newLogContext) {
273 this.logContext = newLogContext;
278 * Builds a new MessageAssembler instance.
280 * @return a new MessageAssembler
282 public MessageAssembler build() {
283 return new MessageAssembler(this);