Class BlockingQueueStreamSource<T>
- java.lang.Object
-
- org.pentaho.di.trans.streaming.common.BlockingQueueStreamSource<T>
-
- All Implemented Interfaces:
StreamSource<T>
public abstract class BlockingQueueStreamSource<T> extends Object implements StreamSource<T>
Implementation of StreamSource which handles pause/resume logic, as well as creation of .rows() which generates a blocking iterable.Child classes should implement
StreamSource.open()
to connect to a datasource. The child .open implementation will typically start a new thread that feeds rows of data to theacceptRows(List)
method. Any resource cleanup should be included in a .close() implementation, along with a call to super.close() to complete the blocking iterable.
-
-
Field Summary
Fields Modifier and Type Field Description protected BaseStreamStep
streamStep
-
Constructor Summary
Constructors Modifier Constructor Description protected
BlockingQueueStreamSource(BaseStreamStep streamStep)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
acceptRows(List<T> rows)
Accept rows, blocking if currently paused or if there are no permitsvoid
close()
Signals this stream is no longer in use and can clean up resources.void
error(Throwable throwable)
Child implementations of this class can call .error() when an unexpected event occurs while passing rows to the acceptRows() method.io.reactivex.Flowable<T>
flowable()
Returns the rows of data as an iterable.void
pause()
Marks the source paused (if not already) and acquires the permit, which will cause acceptRows to block.protected Object
readBytes(byte[] bytes)
void
resume()
Resumes accepting input if paused, otherwise noop.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.pentaho.di.trans.streaming.api.StreamSource
open
-
-
-
-
Field Detail
-
streamStep
protected final BaseStreamStep streamStep
-
-
Constructor Detail
-
BlockingQueueStreamSource
protected BlockingQueueStreamSource(BaseStreamStep streamStep)
-
-
Method Detail
-
flowable
public io.reactivex.Flowable<T> flowable()
Description copied from interface:StreamSource
Returns the rows of data as an iterable.- Specified by:
flowable
in interfaceStreamSource<T>
-
close
public void close()
Description copied from interface:StreamSource
Signals this stream is no longer in use and can clean up resources.- Specified by:
close
in interfaceStreamSource<T>
-
pause
public void pause()
Marks the source paused (if not already) and acquires the permit, which will cause acceptRows to block.- Specified by:
pause
in interfaceStreamSource<T>
-
resume
public void resume()
Description copied from interface:StreamSource
Resumes accepting input if paused, otherwise noop.- Specified by:
resume
in interfaceStreamSource<T>
-
acceptRows
protected void acceptRows(List<T> rows)
Accept rows, blocking if currently paused or if there are no permitsImplementations should implement the open() function to pass external row events to the acceptRows method.
-
error
public void error(Throwable throwable)
Child implementations of this class can call .error() when an unexpected event occurs while passing rows to the acceptRows() method. For example, if an implementation includes a poll loop which retrieves data from a message queue and passes chunks of rows to .acceptRows, an connection failure to the message queue should be handled by calling error() with the connection exception. This will make sure that any consumers of the rows() iterable will receive that error.
-
readBytes
protected Object readBytes(byte[] bytes)
-
-