net.sf.farrago.runtime
Class FennelPipeTupleIter

java.lang.Object
  extended by org.eigenbase.runtime.AbstractTupleIter
      extended by net.sf.farrago.runtime.FennelAbstractTupleIter
          extended by net.sf.farrago.runtime.FennelPipeTupleIter
All Implemented Interfaces:
TupleIter, ClosableAllocation
Direct Known Subclasses:
FennelPipeIterator

public class FennelPipeTupleIter
extends FennelAbstractTupleIter

FennelPipeTupleIter implements the TupleIter interface, receiving data from a producer as ByteBuffer objects, and unmarshalling them to a consumer.

A FennelPipeTupleIter has a C++ peer, a JavaSinkExecstream. The peer sends marshalled data, wrapped as a ByteBuffer. The reader has a current buffer from which it unmarshals rows on demand, and a queue of buffers to read next. The queue is synchronized; but it is left available to the writer between TupleIter calls (#fetchNext()). Otherwise, if it were unavailable for a long time, the peer XO would block, which is a severe problem for a single-thread XO scheduler.

Version:
$Id: //open/dev/farrago/src/net/sf/farrago/runtime/FennelPipeTupleIter.java#9 $
Author:
Julian Hyde, Stephan Zuercher

Nested Class Summary
 
Nested classes/interfaces inherited from interface org.eigenbase.runtime.TupleIter
TupleIter.NoDataReason, TupleIter.TimeoutException
 
Field Summary
private  ArrayQueue<ByteBuffer> moreBuffers
          buffers from the writer, not yet read
protected static Logger tracer
           
 
Fields inherited from class net.sf.farrago.runtime.FennelAbstractTupleIter
bufferAsArray, byteBuffer, tupleReader
 
Fields inherited from interface org.eigenbase.runtime.TupleIter
EMPTY_ITERATOR
 
Constructor Summary
FennelPipeTupleIter(FennelTupleReader tupleReader)
          creates a new FennelPipeTupleIter object.
 
Method Summary
 void closeAllocation()
          Closes this object.
private  ByteBuffer dequeue()
          Gets the head buffer from the queue.
private  void enqueue(ByteBuffer bb)
          adds a buffer to the buffer queue.
 ByteBuffer getByteBuffer(int size)
          Gets a direct ByteBuffer suitable for #write.
protected  int populateBuffer()
          Populates the buffer with a new batch of data, and returns the size in bytes.
 void restart()
          Restarts this iterator, so that a subsequent call to TupleIter.fetchNext() returns the first element in the collection being iterated.
protected  void traceNext(Object val)
           
 void write(ByteBuffer bb, int bblen)
          Writes the contents of a byte buffer into this iterator.
 
Methods inherited from class net.sf.farrago.runtime.FennelAbstractTupleIter
fetchNext, getStatus, requestData
 
Methods inherited from class org.eigenbase.runtime.AbstractTupleIter
setTimeout
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

tracer

protected static final Logger tracer

moreBuffers

private final ArrayQueue<ByteBuffer> moreBuffers
buffers from the writer, not yet read

Constructor Detail

FennelPipeTupleIter

public FennelPipeTupleIter(FennelTupleReader tupleReader)
creates a new FennelPipeTupleIter object.

Parameters:
tupleReader - FennelTupleReader to use to interpret Fennel data
Method Detail

enqueue

private void enqueue(ByteBuffer bb)
adds a buffer to the buffer queue. The writer peer calls this.


dequeue

private ByteBuffer dequeue()
Gets the head buffer from the queue. If the queue is empty, blocks until a buffer arrives.


restart

public void restart()
Description copied from interface: TupleIter
Restarts this iterator, so that a subsequent call to TupleIter.fetchNext() returns the first element in the collection being iterated.

Specified by:
restart in interface TupleIter
Overrides:
restart in class FennelAbstractTupleIter

traceNext

protected void traceNext(Object val)
Overrides:
traceNext in class FennelAbstractTupleIter

closeAllocation

public void closeAllocation()
Description copied from interface: ClosableAllocation
Closes this object.


populateBuffer

protected int populateBuffer()
Description copied from class: FennelAbstractTupleIter
Populates the buffer with a new batch of data, and returns the size in bytes. The buffer position is set to the start. The call may block until the buffer is filled or it may return an indication that there is no data currently available. A subclass can implement this to fill the buffer itself, or it can work by allowing an outside object to fill the buffer.

Specified by:
populateBuffer in class FennelAbstractTupleIter
Returns:
The number of bytes now in the buffer. 0 means end of stream. Less than 0 means no data currently available.

getByteBuffer

public ByteBuffer getByteBuffer(int size)
Gets a direct ByteBuffer suitable for #write. The C++ caller may pin the backing array (JNI GetByteArrayElements), copy data, and then pass back the ByteBuffer by calling #write.


write

public void write(ByteBuffer bb,
                  int bblen)
           throws Throwable
Writes the contents of a byte buffer into this iterator. To avoid an extra copy here, the buffer should be direct and expose its backing array (ie byteBufer.hasArray() == true). (Unfortunately the result of JNI NewDirectByteBuffer() need not have a backing array).

This method is called by the producer, typically from JNI.

The limit of the byte buffer is ignored; the bblen parameter is used instead.

Parameters:
bb - A ByteBuffer containing the new data
bblen - size of bb in bytes; 0 means end of data.
Throws:
Throwable