Package org.pentaho.di.kf
Class KFData
- java.lang.Object
-
- org.pentaho.di.trans.step.BaseStepData
-
- org.pentaho.di.kf.KFData
-
- All Implemented Interfaces:
org.pentaho.di.trans.step.StepDataInterface
public class KFData extends org.pentaho.di.trans.step.BaseStepData implements org.pentaho.di.trans.step.StepDataInterface
Holds temporary data and various utility routines for loading serialized knowledge flows etc.- Version:
- $Revision: 43074 $
- Author:
- Mark Hall (mhall{[at]}pentaho{[dot]}Com)
-
-
Field Summary
Fields Modifier and Type Field Description static String[]
INJECT_EVENTS
Allowable type of events to generate to inject data into the knowledge flowprotected boolean
m_bufferingComplete
True when buffering for header creation has been completed for streaming setupsprotected String
m_classAttributeName
The name of the class attribute (if any).protected int
m_currentRow
the current row numberprotected boolean
m_hasNominalAtts
True if incoming data has nominal attributesprotected int
m_incrementalStatus
Status to use for incremental instance eventsprotected org.pentaho.dm.commons.ArffMeta[]
m_injectArffMetas
Meta data for the fields to be injected into the knowledge flowprotected String
m_injectEventName
The name of the event to use for injecting (must be instance for streaming)protected int[]
m_injectFieldIndexes
Mapping to fields in incoming kettle dataprotected org.pentaho.dm.commons.ArffMeta[]
m_injectFields
Meta data for the ARFF instances input to the inject stepprotected boolean
m_injectSetupOK
If the inject setup validates, then we can process rows for input into the flowprotected weka.gui.beans.BeanInstance
m_injectStep
The step in the knowledge flow to inject intoprotected int
m_k
the size of the sampleprotected org.pentaho.di.core.logging.LogChannelInterface
m_log
loggingprotected Map<Object,Object>[]
m_nominalVals
Maps to hold the nominal values of the ARFF data for the inject stepprotected org.pentaho.di.core.row.RowMetaInterface
m_outputRowMeta
The (eventual) outgoing row meta dataprotected Random
m_random
random number generatorprotected List<Object[]>
m_sample
Holds sampled rowsprotected String
m_sampleRelationName
Relation name for Instances created from the sampled rowsprotected boolean
m_streamData
True if data is to be injected one instance at a timeprotected weka.core.Instances
m_streamingHeader
Header for instances that are to be streamed.static String[]
OUTPUT_EVENTS
Allowable types of events that we can listen for from the knowledge flow
-
Constructor Summary
Constructors Constructor Description KFData()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
allocate(int num)
Allocate an array to hold meta data for the ARFF instancesvoid
cleanUp()
Free memory used by the reservoir/cacheprotected static weka.gui.beans.BeanInstance
findKFStep(Vector flow, String stepName)
protected static String
flowToXML(Vector<Vector<?>> flow)
void
flushStreamingBuffer(org.pentaho.di.core.row.RowMetaInterface inputMeta, Object[] inputRow)
protected static ArrayList<String>
getAllAllowableOutputEventNames(Vector flow, String compName)
Get a list of all event names that we can accept from the supplied knowledgeflow component.protected static ArrayList<String>
getAllAllowableOutputStepNames(Vector flow)
Return a list of all valid output steps in the flow (i.e.protected static ArrayList<String>
getAllInjectStepNames(Vector flow)
Look for all knowledge flow components that are of type "KettleInject"protected boolean
getBufferingForStreaming()
Returns true if streaming has been selected and there are nominal attributes in the incoming data.protected static ArrayList<String>
getEventsForNamedStep(Vector flow, String compName)
Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at presentorg.pentaho.dm.commons.ArffMeta[]
getInjectFields()
protected boolean
getInjectSetupOK()
Get the status of the flow with regards to the inject step.protected org.pentaho.di.core.row.RowMetaInterface
getOutputRowMeta()
Get the output row meta data.protected boolean
getStreamData()
Get whether data is to be streamed to the flow.protected void
initializeReservoir(int sampleSize, int seed)
Initialize the reservoir/cache for the requested sample (or cache) size.protected void
injectData(weka.core.Instances data, Vector beans)
Inject batch data into the knowledge flow.protected void
injectInstance(weka.core.Instance inst)
Inject a single instance into the knowledge flow.protected static Vector<Vector<?>>
loadSerializedKF(File kfFile)
Loads a serialized knowledge flowprotected void
processRow(Object[] inputRow, org.pentaho.di.core.row.RowMetaInterface inputMeta)
Processes a single incoming row.protected weka.core.Instances
reservoirToInstances(org.pentaho.di.core.row.RowMetaInterface inputRowMeta)
Convert the contents of the reservoir into a set of Instances.protected void
setClassAttributeName(String classAttName)
Set the name of the attribute to use as the class.protected void
setIncrementalStatus(int status)
Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).protected void
setInjectEventName(String injectEventName)
Set the name of the event to inject data using.protected void
setInjectFieldIndexes(int[] injectFieldIndexes, org.pentaho.dm.commons.ArffMeta[] arffMetas)
Set the indexes of the fields to inject into the knowledge flowvoid
setInjectFields(org.pentaho.dm.commons.ArffMeta[] arffMeta)
protected void
setInjectSetupOK(boolean ok)
Sets the status of the flow with regards to the step to inject into.protected void
setInjectStep(weka.gui.beans.BeanInstance injectStep)
Set the knowledge flow step to inject data intovoid
setLog(org.pentaho.di.core.logging.LogChannelInterface log)
protected void
setOutputRowMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta classprotected void
setSampleRelationName(String relationName)
Set the relation name for the sample.protected void
setStreamData(boolean streamData)
Set whether the flow is to have data streamed to it (i.e.protected void
setupArffMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
Sets up the ArffMeta array based on the incomming Kettle row format.protected static void
validateInjectSetup(Vector flow, String injectStepName, String injectEventName)
Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).protected static void
validateOutputSetup(Vector flow, Object outputListener, String outputStepName, String outputEventName)
Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.protected static Vector<Vector<?>>
xmlToFlow(String xml)
-
Methods inherited from class org.pentaho.di.trans.step.BaseStepData
getStatus, isDisposed, isEmpty, isFinished, isIdle, isInitialising, isRunning, isStopped, setStatus
-
-
-
-
Field Detail
-
m_outputRowMeta
protected org.pentaho.di.core.row.RowMetaInterface m_outputRowMeta
The (eventual) outgoing row meta data
-
m_injectFields
protected org.pentaho.dm.commons.ArffMeta[] m_injectFields
Meta data for the ARFF instances input to the inject step
-
m_injectSetupOK
protected boolean m_injectSetupOK
If the inject setup validates, then we can process rows for input into the flow
-
m_injectArffMetas
protected org.pentaho.dm.commons.ArffMeta[] m_injectArffMetas
Meta data for the fields to be injected into the knowledge flow
-
m_injectFieldIndexes
protected int[] m_injectFieldIndexes
Mapping to fields in incoming kettle data
-
m_hasNominalAtts
protected boolean m_hasNominalAtts
True if incoming data has nominal attributes
-
m_streamData
protected boolean m_streamData
True if data is to be injected one instance at a time
-
m_incrementalStatus
protected int m_incrementalStatus
Status to use for incremental instance events
-
m_streamingHeader
protected weka.core.Instances m_streamingHeader
Header for instances that are to be streamed. If there are no nominal attributes in the incoming data, then this will get constructed on the first incoming row. Otherwise, it will be constructed after we have seen x rows (where x is the number of rows to cache in order to determine legal nominal values).
-
m_classAttributeName
protected String m_classAttributeName
The name of the class attribute (if any).
-
m_bufferingComplete
protected boolean m_bufferingComplete
True when buffering for header creation has been completed for streaming setups
-
m_injectStep
protected weka.gui.beans.BeanInstance m_injectStep
The step in the knowledge flow to inject into
-
m_injectEventName
protected String m_injectEventName
The name of the event to use for injecting (must be instance for streaming)
-
m_nominalVals
protected Map<Object,Object>[] m_nominalVals
Maps to hold the nominal values of the ARFF data for the inject step
-
m_k
protected int m_k
the size of the sample
-
m_currentRow
protected int m_currentRow
the current row number
-
m_sampleRelationName
protected String m_sampleRelationName
Relation name for Instances created from the sampled rows
-
m_random
protected Random m_random
random number generator
-
m_log
protected org.pentaho.di.core.logging.LogChannelInterface m_log
logging
-
INJECT_EVENTS
public static final String[] INJECT_EVENTS
Allowable type of events to generate to inject data into the knowledge flow
-
OUTPUT_EVENTS
public static final String[] OUTPUT_EVENTS
Allowable types of events that we can listen for from the knowledge flow
-
-
Method Detail
-
setLog
public void setLog(org.pentaho.di.core.logging.LogChannelInterface log)
-
setInjectFields
public void setInjectFields(org.pentaho.dm.commons.ArffMeta[] arffMeta)
-
getInjectFields
public org.pentaho.dm.commons.ArffMeta[] getInjectFields()
-
allocate
protected void allocate(int num)
Allocate an array to hold meta data for the ARFF instances- Parameters:
num
- number of meta data objects to allocate
-
setupArffMeta
protected void setupArffMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
Sets up the ArffMeta array based on the incomming Kettle row format.- Parameters:
rmi
- aRowMetaInterface
value
-
initializeReservoir
protected void initializeReservoir(int sampleSize, int seed)
Initialize the reservoir/cache for the requested sample (or cache) size.- Parameters:
sampleSize
- the number of rows to sample or to cacheseed
- the random number seed to use
-
setInjectFieldIndexes
protected void setInjectFieldIndexes(int[] injectFieldIndexes, org.pentaho.dm.commons.ArffMeta[] arffMetas)
Set the indexes of the fields to inject into the knowledge flow- Parameters:
injectFieldIndexes
- array of indexesarffMetas
- array of arff metas
-
setInjectSetupOK
protected void setInjectSetupOK(boolean ok)
Sets the status of the flow with regards to the step to inject into.- Parameters:
ok
- true if the inject step is good to go
-
getInjectSetupOK
protected boolean getInjectSetupOK()
Get the status of the flow with regards to the inject step.- Returns:
- true if the flow is good to go
-
setStreamData
protected void setStreamData(boolean streamData)
Set whether the flow is to have data streamed to it (i.e. instance events).- Parameters:
streamData
- true if data is to be streamed to the knowledge flow
-
getStreamData
protected boolean getStreamData()
Get whether data is to be streamed to the flow.- Returns:
- true if data is to be streamed to the flow
-
setIncrementalStatus
protected void setIncrementalStatus(int status)
Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).- Parameters:
status
- the status of the current incremental event
-
getBufferingForStreaming
protected boolean getBufferingForStreaming()
Returns true if streaming has been selected and there are nominal attributes in the incoming data. In this case, we have to buffer some data in order to determine nominal values.- Returns:
- true if streaming is selected and we will be buffering data (i.e. there are nominal attributes in the incoming data)
-
setClassAttributeName
protected void setClassAttributeName(String classAttName)
Set the name of the attribute to use as the class.- Parameters:
classAttName
- the name of the class attribute
-
setOutputRowMeta
protected void setOutputRowMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta class- Parameters:
rmi
- the incoming row meta data
-
getOutputRowMeta
protected org.pentaho.di.core.row.RowMetaInterface getOutputRowMeta()
Get the output row meta data.- Returns:
- the output row meta data
-
setSampleRelationName
protected void setSampleRelationName(String relationName)
Set the relation name for the sample.- Parameters:
relationName
- the relation name to use for the sample
-
setInjectStep
protected void setInjectStep(weka.gui.beans.BeanInstance injectStep)
Set the knowledge flow step to inject data into- Parameters:
injectStep
- the knowledge flow step to inject data into
-
setInjectEventName
protected void setInjectEventName(String injectEventName)
Set the name of the event to inject data using.- Parameters:
injectEventName
- the name of the event to inject data with
-
loadSerializedKF
protected static Vector<Vector<?>> loadSerializedKF(File kfFile) throws Exception
Loads a serialized knowledge flow- Parameters:
kfFile
- aFile
value- Returns:
- a
Vector
containing a Vector of beans and a Vector of connections - Throws:
Exception
- if there is a problem during loading
-
flowToXML
protected static String flowToXML(Vector<Vector<?>> flow) throws Exception
- Throws:
Exception
-
xmlToFlow
protected static Vector<Vector<?>> xmlToFlow(String xml) throws Exception
- Throws:
Exception
-
reservoirToInstances
protected weka.core.Instances reservoirToInstances(org.pentaho.di.core.row.RowMetaInterface inputRowMeta) throws org.pentaho.di.core.exception.KettleException
Convert the contents of the reservoir into a set of Instances.- Parameters:
inputRowMeta
- the meta data for the incoming rows- Returns:
- an Instances object holding all the data in the reservoir
- Throws:
org.pentaho.di.core.exception.KettleException
- if there is a problem during the conversion
-
injectInstance
protected void injectInstance(weka.core.Instance inst)
Inject a single instance into the knowledge flow. Used in streaming setups.- Parameters:
inst
- the instance to inject
-
injectData
protected void injectData(weka.core.Instances data, Vector beans) throws Exception
Inject batch data into the knowledge flow.- Parameters:
data
- the instances to injectbeans
- the flow being executed- Throws:
Exception
-
processRow
protected void processRow(Object[] inputRow, org.pentaho.di.core.row.RowMetaInterface inputMeta) throws Exception
Processes a single incoming row. Depending on the setup, this may entail checking to see if this row should go into the reservoir/cache or be passed directly on to the knowledge flow.- Parameters:
inputRow
- the incoming Kettle rowinputMeta
- the row meta data- Throws:
Exception
- if something goes wrong
-
flushStreamingBuffer
public void flushStreamingBuffer(org.pentaho.di.core.row.RowMetaInterface inputMeta, Object[] inputRow) throws org.pentaho.di.core.exception.KettleException
- Throws:
org.pentaho.di.core.exception.KettleException
-
cleanUp
public void cleanUp()
Free memory used by the reservoir/cache
-
getAllAllowableOutputStepNames
protected static ArrayList<String> getAllAllowableOutputStepNames(Vector flow)
Return a list of all valid output steps in the flow (i.e. those that produce at least one event that we can accept).- Parameters:
flow
- the flow to use- Returns:
- a list of valid output steps
-
getAllAllowableOutputEventNames
protected static ArrayList<String> getAllAllowableOutputEventNames(Vector flow, String compName)
Get a list of all event names that we can accept from the supplied knowledgeflow component.- Parameters:
flow
- the flow to usecompName
- the name of the component to generate the list for- Returns:
- the list of event names that we can accept from the supplied component
-
getAllInjectStepNames
protected static ArrayList<String> getAllInjectStepNames(Vector flow)
Look for all knowledge flow components that are of type "KettleInject"- Parameters:
flow
- the flow to use- Returns:
- an ArrayList of the names of knowledge flow components that are of type KettleInject
-
getEventsForNamedStep
protected static ArrayList<String> getEventsForNamedStep(Vector flow, String compName)
Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at present- Parameters:
flow
- the flow to usecompName
- the knowledge flow step in questionan
- ArrayList of the acceptable events (connections)
-
validateOutputSetup
protected static void validateOutputSetup(Vector flow, Object outputListener, String outputStepName, String outputEventName) throws org.pentaho.di.core.exception.KettleStepException
Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.- Parameters:
outputListener
- the object to register with the output step (may be null if just validation is required)flow
- a Vector containing a Vector of beans and a Vector of connectionsoutputStepName
- the name of the output Knowledge Flow stepoutputEventName
- the name of the even produced by the output step to listen for- Throws:
org.pentaho.di.core.exception.KettleStepException
- if a problem occurs
-
validateInjectSetup
protected static void validateInjectSetup(Vector flow, String injectStepName, String injectEventName) throws org.pentaho.di.core.exception.KettleStepException
Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).- Parameters:
the
- flow to useinjectStepName
- the name of the Knowledge Flow step to inject data intoinjectEventName
- the name of the event type to use to pass data to the inject step- Throws:
org.pentaho.di.core.exception.KettleStepException
- if validation fails for some reason
-
-