Class FennelPipeTupleIter

  extended by org.eigenbase.runtime.AbstractTupleIter
      extended by net.sf.farrago.runtime.FennelAbstractTupleIter
          extended by net.sf.farrago.runtime.FennelPipeTupleIter
All Implemented Interfaces:
TupleIter, ClosableAllocation
Direct Known Subclasses:

public class FennelPipeTupleIter
extends FennelAbstractTupleIter

FennelPipeTupleIter implements the TupleIter interface, receiving data from a producer as ByteBuffer objects, and unmarshalling them to a consumer.

A FennelPipeTupleIter has a C++ peer, a JavaSinkExecstream. The peer sends marshalled data, wrapped as a ByteBuffer. The reader has a current buffer from which it unmarshals rows on demand, and a queue of buffers to read next. The queue is synchronized; but it is left available to the writer between TupleIter calls (#fetchNext()). Otherwise, if it were unavailable for a long time, the peer XO would block, which is a severe problem for a single-thread XO scheduler.

$Id: //open/dev/farrago/src/net/sf/farrago/runtime/ $
Julian Hyde, Stephan Zuercher

Nested Class Summary
Nested classes/interfaces inherited from interface org.eigenbase.runtime.TupleIter
TupleIter.NoDataReason, TupleIter.TimeoutException
Field Summary
private  ArrayQueue<ByteBuffer> moreBuffers
          buffers from the writer, not yet read
protected static Logger tracer
Fields inherited from class net.sf.farrago.runtime.FennelAbstractTupleIter
bufferAsArray, byteBuffer, tupleReader
Fields inherited from interface org.eigenbase.runtime.TupleIter
Constructor Summary
FennelPipeTupleIter(FennelTupleReader tupleReader)
          creates a new FennelPipeTupleIter object.
Method Summary
 void closeAllocation()
          Closes this object.
private  ByteBuffer dequeue()
          Gets the head buffer from the queue.
private  void enqueue(ByteBuffer bb)
          adds a buffer to the buffer queue.
 ByteBuffer getByteBuffer(int size)
          Gets a direct ByteBuffer suitable for #write.
protected  int populateBuffer()
          Populates the buffer with a new batch of data, and returns the size in bytes.
 void restart()
          Restarts this iterator, so that a subsequent call to TupleIter.fetchNext() returns the first element in the collection being iterated.
protected  void traceNext(Object val)
 void write(ByteBuffer bb, int bblen)
          Writes the contents of a byte buffer into this iterator.
Methods inherited from class net.sf.farrago.runtime.FennelAbstractTupleIter
fetchNext, getStatus, requestData
Methods inherited from class org.eigenbase.runtime.AbstractTupleIter
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Field Detail


protected static final Logger tracer


private final ArrayQueue<ByteBuffer> moreBuffers
buffers from the writer, not yet read

Constructor Detail


public FennelPipeTupleIter(FennelTupleReader tupleReader)
creates a new FennelPipeTupleIter object.

tupleReader - FennelTupleReader to use to interpret Fennel data
Method Detail


private void enqueue(ByteBuffer bb)
adds a buffer to the buffer queue. The writer peer calls this.


private ByteBuffer dequeue()
Gets the head buffer from the queue. If the queue is empty, blocks until a buffer arrives.


public void restart()
Description copied from interface: TupleIter
Restarts this iterator, so that a subsequent call to TupleIter.fetchNext() returns the first element in the collection being iterated.

Specified by:
restart in interface TupleIter
restart in class FennelAbstractTupleIter


protected void traceNext(Object val)
traceNext in class FennelAbstractTupleIter


public void closeAllocation()
Description copied from interface: ClosableAllocation
Closes this object.


protected int populateBuffer()
Description copied from class: FennelAbstractTupleIter
Populates the buffer with a new batch of data, and returns the size in bytes. The buffer position is set to the start. The call may block until the buffer is filled or it may return an indication that there is no data currently available. A subclass can implement this to fill the buffer itself, or it can work by allowing an outside object to fill the buffer.

Specified by:
populateBuffer in class FennelAbstractTupleIter
The number of bytes now in the buffer. 0 means end of stream. Less than 0 means no data currently available.


public ByteBuffer getByteBuffer(int size)
Gets a direct ByteBuffer suitable for #write. The C++ caller may pin the backing array (JNI GetByteArrayElements), copy data, and then pass back the ByteBuffer by calling #write.


public void write(ByteBuffer bb,
                  int bblen)
           throws Throwable
Writes the contents of a byte buffer into this iterator. To avoid an extra copy here, the buffer should be direct and expose its backing array (ie byteBufer.hasArray() == true). (Unfortunately the result of JNI NewDirectByteBuffer() need not have a backing array).

This method is called by the producer, typically from JNI.

The limit of the byte buffer is ignored; the bblen parameter is used instead.

bb - A ByteBuffer containing the new data
bblen - size of bb in bytes; 0 means end of data.