Add isComplete callback to commitEntry
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueueNoBarrier.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.util.concurrent.FutureCallback;
12
13 import io.netty.channel.Channel;
14
15 import java.util.function.Function;
16
17 import javax.annotation.Nonnull;
18
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Class is designed for stacking Statistics and propagate immediate response for all
28  * another requests.
29  */
30 public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue {
31
32     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueueNoBarrier.class);
33
34     StackedOutboundQueueNoBarrier(final AbstractOutboundQueueManager<?, ?> manager) {
35         super(manager);
36     }
37
38     /*
39      * This method is expected to be called from multiple threads concurrently
40      */
41     @Override
42     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
43             final Function<OfHeader, Boolean> isCompletedFunction) {
44         final OutboundQueueEntry entry = getEntry(xid);
45
46         if (message instanceof FlowModInput) {
47             callback.onSuccess(null);
48             entry.commit(message, null, isCompletedFunction);
49         } else {
50             entry.commit(message, callback, isCompletedFunction);
51         }
52
53         LOG.trace("Queue {} committed XID {}", this, xid);
54         manager.ensureFlushing();
55     }
56
57     @Override
58     int writeEntries(@Nonnull final Channel channel, final long now) {
59         // Local cache
60         StackedSegment segment = firstSegment;
61         int entries = 0;
62
63         while (channel.isWritable()) {
64             final OutboundQueueEntry entry = segment.getEntry(flushOffset);
65             if (!entry.isCommitted()) {
66                 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid()
67                         + flushOffset, segment, flushOffset);
68                 break;
69             }
70
71             LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
72             final OfHeader message = entry.takeMessage();
73             flushOffset++;
74             entries++;
75
76             if (message != null) {
77                 manager.writeMessage(message, now);
78             } else {
79                 entry.complete(null);
80             }
81
82             if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
83                 /*
84                  * Slow path: purge the current segment unless it's the last one.
85                  * If it is, we leave it for replacement when a new reservation
86                  * is run on it.
87                  * This costs us two slow paths, but hey, this should be very rare,
88                  * so let's keep things simple.
89                  */
90                 synchronized (unflushedSegments) {
91                     LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
92
93                     // We may have raced ahead of reservation code and need to allocate a segment
94                     ensureSegment(segment, flushOffset);
95
96                     // Remove the segment, update the firstSegment and reset flushOffset
97                     final StackedSegment oldSegment = unflushedSegments.remove(0);
98                     oldSegment.completeAll();
99                     uncompletedSegments.remove(oldSegment);
100                     oldSegment.recycle();
101
102                     // Reset the first segment and add it to the uncompleted list
103                     segment = unflushedSegments.get(0);
104                     uncompletedSegments.add(segment);
105
106                     // Update the shutdown offset
107                     if (shutdownOffset != null) {
108                         shutdownOffset -= StackedSegment.SEGMENT_SIZE;
109                     }
110
111                     // Allow reservations back on the fast path by publishing the new first segment
112                     firstSegment = segment;
113
114                     flushOffset = 0;
115                     LOG.debug("Queue {} flush moved to segment {}", this, segment);
116                 }
117             }
118         }
119
120         return entries;
121     }
122
123 }