org.eigenbase.runtime
Class QueueIterator

java.lang.Object
  extended by org.eigenbase.runtime.QueueIterator
All Implemented Interfaces:
Iterator
Direct Known Subclasses:
ThreadIterator

public class QueueIterator
extends Object
implements Iterator

Adapter that exposes a 'push' producer as an Iterator. Supports one or more producers feeding into a single consumer. The consumer and the producers must each run in its own thread. When there are several producers the data is merged as it arrives: no sorting.

By default, the queue contains at most one object (implemented via SynchronousQueue), but this can be customized by supplying an alternate implementation (e.g. ArrayBlockingQueue) to the constructor. If you call next, your thread will wait until a producer thread calls put(java.lang.Object) or done(java.lang.Throwable). Nulls are allowed. If a producer has an error, it can pass it to the consumer via done(java.lang.Throwable).

Since:
Oct 20, 2003
Version:
$Id: //open/dev/farrago/src/org/eigenbase/runtime/QueueIterator.java#13 $
Author:
jhyde

Nested Class Summary
private static class QueueIterator.EndOfQueue
          Sentinel object.
static class QueueIterator.TimeoutException
          Thrown by hasNext(long) and next(long) to indicate that operation timed out before rows were available.
private static class QueueIterator.WrappedNull
          A null masquerading as a real object.
 
Field Summary
protected  boolean hasNext
          false when Iterator is finished
protected  Object next
          next Iterator value (nulls are represented via #WRAPPED_NULL)
private  int numProducers
           
protected  BlockingQueue queue
           
protected  Throwable throwable
           
private  EigenbaseLogger tracer
           
private static QueueIterator.WrappedNull WRAPPED_NULL
           
 
Constructor Summary
QueueIterator()
          default constructor (one producer, no tracer, SynchronousQueue)
QueueIterator(int n, Logger tracer)
           
QueueIterator(int n, Logger tracer, BlockingQueue queue)
           
 
Method Summary
protected  void checkTermination()
          Checks for end-of-queue, and throws an error if one has been set via done(Throwable).
 void done(Throwable throwable)
          Producer calls done to say that there are no more objects, setting throwable if there was an error.
 boolean hasNext()
           
 boolean hasNext(long timeoutMillis)
          As hasNext, but throws QueueIterator.TimeoutException if no row is available within the timeout.
 Object next()
           
 Object next(long timeoutMillis)
          As next, but throws QueueIterator.TimeoutException if no row is available within the timeout.
 boolean offer(Object o, long timeoutMillis)
          Producer calls offer to attempt to add another object (which may be null) with a timeout.
protected  void onEndOfQueue()
          Called (from the consumer thread context) just before the iterator returns false for hasNext().
 void put(Object o)
          Producer calls put to add another object (which may be null).
 void remove()
           
protected  void reset(int n)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

WRAPPED_NULL

private static final QueueIterator.WrappedNull WRAPPED_NULL

numProducers

private int numProducers

tracer

private final EigenbaseLogger tracer

next

protected Object next
next Iterator value (nulls are represented via #WRAPPED_NULL)


hasNext

protected boolean hasNext
false when Iterator is finished


throwable

protected Throwable throwable

queue

protected BlockingQueue queue
Constructor Detail

QueueIterator

public QueueIterator()
default constructor (one producer, no tracer, SynchronousQueue)


QueueIterator

public QueueIterator(int n,
                     Logger tracer)
Parameters:
n - number of producers
tracer - trace to this Logger, or null.

QueueIterator

public QueueIterator(int n,
                     Logger tracer,
                     BlockingQueue queue)
Parameters:
n - number of producers
tracer - trace to this Logger, or null.
queue - BlockingQueue implementation, or null for default
Method Detail

reset

protected void reset(int n)

done

public void done(Throwable throwable)
Producer calls done to say that there are no more objects, setting throwable if there was an error.


hasNext

public boolean hasNext()
Specified by:
hasNext in interface Iterator

hasNext

public boolean hasNext(long timeoutMillis)
                throws QueueIterator.TimeoutException
As hasNext, but throws QueueIterator.TimeoutException if no row is available within the timeout.

Parameters:
timeoutMillis - Milliseconds to wait; less than or equal to zero means don't wait
Throws:
QueueIterator.TimeoutException

next

public Object next()
Specified by:
next in interface Iterator

next

public Object next(long timeoutMillis)
            throws QueueIterator.TimeoutException
As next, but throws QueueIterator.TimeoutException if no row is available within the timeout.

Parameters:
timeoutMillis - Milliseconds to wait; less than or equal to zero means don't wait
Throws:
QueueIterator.TimeoutException

put

public void put(Object o)
Producer calls put to add another object (which may be null).

Parameters:
o - object to put
Throws:
IllegalStateException - if this method is called after done(java.lang.Throwable)

offer

public boolean offer(Object o,
                     long timeoutMillis)
Producer calls offer to attempt to add another object (which may be null) with a timeout.

Parameters:
o - object to offer
timeoutMillis - Milliseconds to wait; less than or equal to zero means don't wait
Returns:
true if offer accepted
Throws:
IllegalStateException - if this method is called after done(java.lang.Throwable)

remove

public void remove()
Specified by:
remove in interface Iterator

checkTermination

protected void checkTermination()
Checks for end-of-queue, and throws an error if one has been set via done(Throwable).


onEndOfQueue

protected void onEndOfQueue()
Called (from the consumer thread context) just before the iterator returns false for hasNext(). Default implementation does nothing, but subclasses can use this for cleanup actions.