Class KFData

java.lang.Object
org.pentaho.di.trans.step.BaseStepData
org.pentaho.di.kf.KFData
All Implemented Interfaces:
org.pentaho.di.trans.step.StepDataInterface

public class KFData extends org.pentaho.di.trans.step.BaseStepData implements org.pentaho.di.trans.step.StepDataInterface
Holds temporary data and various utility routines for loading serialized knowledge flows etc.
Version:
$Revision: 43074 $
Author:
Mark Hall (mhall{[at]}pentaho{[dot]}Com)
  • Nested Class Summary

    Nested classes/interfaces inherited from class org.pentaho.di.trans.step.BaseStepData

    org.pentaho.di.trans.step.BaseStepData.StepExecutionStatus
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    static final String[]
    Allowable type of events to generate to inject data into the knowledge flow
    protected boolean
    True when buffering for header creation has been completed for streaming setups
    protected String
    The name of the class attribute (if any).
    protected int
    the current row number
    protected boolean
    True if incoming data has nominal attributes
    protected int
    Status to use for incremental instance events
    protected org.pentaho.dm.commons.ArffMeta[]
    Meta data for the fields to be injected into the knowledge flow
    protected String
    The name of the event to use for injecting (must be instance for streaming)
    protected int[]
    Mapping to fields in incoming kettle data
    protected org.pentaho.dm.commons.ArffMeta[]
    Meta data for the ARFF instances input to the inject step
    protected boolean
    If the inject setup validates, then we can process rows for input into the flow
    protected weka.gui.beans.BeanInstance
    The step in the knowledge flow to inject into
    protected int
    the size of the sample
    protected org.pentaho.di.core.logging.LogChannelInterface
    logging
    protected Map<Object,Object>[]
    Maps to hold the nominal values of the ARFF data for the inject step
    protected org.pentaho.di.core.row.RowMetaInterface
    The (eventual) outgoing row meta data
    protected Random
    random number generator
    protected List<Object[]>
    Holds sampled rows
    protected String
    Relation name for Instances created from the sampled rows
    protected boolean
    True if data is to be injected one instance at a time
    protected weka.core.Instances
    Header for instances that are to be streamed.
    static final String[]
    Allowable types of events that we can listen for from the knowledge flow
  • Constructor Summary

    Constructors
    Constructor
    Description
     
  • Method Summary

    Modifier and Type
    Method
    Description
    protected void
    allocate(int num)
    Allocate an array to hold meta data for the ARFF instances
    void
    Free memory used by the reservoir/cache
    protected static weka.gui.beans.BeanInstance
    findKFStep(Vector flow, String stepName)
     
    protected static String
     
    void
    flushStreamingBuffer(org.pentaho.di.core.row.RowMetaInterface inputMeta, Object[] inputRow)
     
    protected static ArrayList<String>
    Get a list of all event names that we can accept from the supplied knowledgeflow component.
    protected static ArrayList<String>
    Return a list of all valid output steps in the flow (i.e.
    protected static ArrayList<String>
    Look for all knowledge flow components that are of type "KettleInject"
    protected boolean
    Returns true if streaming has been selected and there are nominal attributes in the incoming data.
    protected static ArrayList<String>
    Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at present
    org.pentaho.dm.commons.ArffMeta[]
     
    protected boolean
    Get the status of the flow with regards to the inject step.
    protected org.pentaho.di.core.row.RowMetaInterface
    Get the output row meta data.
    protected boolean
    Get whether data is to be streamed to the flow.
    protected void
    initializeReservoir(int sampleSize, int seed)
    Initialize the reservoir/cache for the requested sample (or cache) size.
    protected void
    injectData(weka.core.Instances data, Vector beans)
    Inject batch data into the knowledge flow.
    protected void
    injectInstance(weka.core.Instance inst)
    Inject a single instance into the knowledge flow.
    protected static Vector<Vector<?>>
    Loads a serialized knowledge flow
    protected void
    processRow(Object[] inputRow, org.pentaho.di.core.row.RowMetaInterface inputMeta)
    Processes a single incoming row.
    protected weka.core.Instances
    reservoirToInstances(org.pentaho.di.core.row.RowMetaInterface inputRowMeta)
    Convert the contents of the reservoir into a set of Instances.
    protected void
    Set the name of the attribute to use as the class.
    protected void
    Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).
    protected void
    setInjectEventName(String injectEventName)
    Set the name of the event to inject data using.
    protected void
    setInjectFieldIndexes(int[] injectFieldIndexes, org.pentaho.dm.commons.ArffMeta[] arffMetas)
    Set the indexes of the fields to inject into the knowledge flow
    void
    setInjectFields(org.pentaho.dm.commons.ArffMeta[] arffMeta)
     
    protected void
    setInjectSetupOK(boolean ok)
    Sets the status of the flow with regards to the step to inject into.
    protected void
    setInjectStep(weka.gui.beans.BeanInstance injectStep)
    Set the knowledge flow step to inject data into
    void
    setLog(org.pentaho.di.core.logging.LogChannelInterface log)
     
    protected void
    setOutputRowMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
    Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta class
    protected void
    Set the relation name for the sample.
    protected void
    setStreamData(boolean streamData)
    Set whether the flow is to have data streamed to it (i.e.
    protected void
    setupArffMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
    Sets up the ArffMeta array based on the incomming Kettle row format.
    protected static void
    validateInjectSetup(Vector flow, String injectStepName, String injectEventName)
    Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).
    protected static void
    validateOutputSetup(Vector flow, Object outputListener, String outputStepName, String outputEventName)
    Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.
    protected static Vector<Vector<?>>
     

    Methods inherited from class org.pentaho.di.trans.step.BaseStepData

    getStatus, isDisposed, isEmpty, isFinished, isIdle, isInitialising, isRunning, isStopped, setStatus

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

    Methods inherited from interface org.pentaho.di.trans.step.StepDataInterface

    getStatus, isDisposed, isEmpty, isFinished, isIdle, isInitialising, isRunning, setStatus
  • Field Details

    • m_outputRowMeta

      protected org.pentaho.di.core.row.RowMetaInterface m_outputRowMeta
      The (eventual) outgoing row meta data
    • m_injectFields

      protected org.pentaho.dm.commons.ArffMeta[] m_injectFields
      Meta data for the ARFF instances input to the inject step
    • m_injectSetupOK

      protected boolean m_injectSetupOK
      If the inject setup validates, then we can process rows for input into the flow
    • m_injectArffMetas

      protected org.pentaho.dm.commons.ArffMeta[] m_injectArffMetas
      Meta data for the fields to be injected into the knowledge flow
    • m_injectFieldIndexes

      protected int[] m_injectFieldIndexes
      Mapping to fields in incoming kettle data
    • m_hasNominalAtts

      protected boolean m_hasNominalAtts
      True if incoming data has nominal attributes
    • m_streamData

      protected boolean m_streamData
      True if data is to be injected one instance at a time
    • m_incrementalStatus

      protected int m_incrementalStatus
      Status to use for incremental instance events
    • m_streamingHeader

      protected weka.core.Instances m_streamingHeader
      Header for instances that are to be streamed. If there are no nominal attributes in the incoming data, then this will get constructed on the first incoming row. Otherwise, it will be constructed after we have seen x rows (where x is the number of rows to cache in order to determine legal nominal values).
    • m_classAttributeName

      protected String m_classAttributeName
      The name of the class attribute (if any).
    • m_bufferingComplete

      protected boolean m_bufferingComplete
      True when buffering for header creation has been completed for streaming setups
    • m_injectStep

      protected weka.gui.beans.BeanInstance m_injectStep
      The step in the knowledge flow to inject into
    • m_injectEventName

      protected String m_injectEventName
      The name of the event to use for injecting (must be instance for streaming)
    • m_nominalVals

      protected Map<Object,Object>[] m_nominalVals
      Maps to hold the nominal values of the ARFF data for the inject step
    • m_k

      protected int m_k
      the size of the sample
    • m_currentRow

      protected int m_currentRow
      the current row number
    • m_sample

      protected List<Object[]> m_sample
      Holds sampled rows
    • m_sampleRelationName

      protected String m_sampleRelationName
      Relation name for Instances created from the sampled rows
    • m_random

      protected Random m_random
      random number generator
    • m_log

      protected org.pentaho.di.core.logging.LogChannelInterface m_log
      logging
    • INJECT_EVENTS

      public static final String[] INJECT_EVENTS
      Allowable type of events to generate to inject data into the knowledge flow
    • OUTPUT_EVENTS

      public static final String[] OUTPUT_EVENTS
      Allowable types of events that we can listen for from the knowledge flow
  • Constructor Details

    • KFData

      public KFData()
  • Method Details

    • setLog

      public void setLog(org.pentaho.di.core.logging.LogChannelInterface log)
    • setInjectFields

      public void setInjectFields(org.pentaho.dm.commons.ArffMeta[] arffMeta)
    • getInjectFields

      public org.pentaho.dm.commons.ArffMeta[] getInjectFields()
    • allocate

      protected void allocate(int num)
      Allocate an array to hold meta data for the ARFF instances
      Parameters:
      num - number of meta data objects to allocate
    • setupArffMeta

      protected void setupArffMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
      Sets up the ArffMeta array based on the incomming Kettle row format.
      Parameters:
      rmi - a RowMetaInterface value
    • initializeReservoir

      protected void initializeReservoir(int sampleSize, int seed)
      Initialize the reservoir/cache for the requested sample (or cache) size.
      Parameters:
      sampleSize - the number of rows to sample or to cache
      seed - the random number seed to use
    • setInjectFieldIndexes

      protected void setInjectFieldIndexes(int[] injectFieldIndexes, org.pentaho.dm.commons.ArffMeta[] arffMetas)
      Set the indexes of the fields to inject into the knowledge flow
      Parameters:
      injectFieldIndexes - array of indexes
      arffMetas - array of arff metas
    • setInjectSetupOK

      protected void setInjectSetupOK(boolean ok)
      Sets the status of the flow with regards to the step to inject into.
      Parameters:
      ok - true if the inject step is good to go
    • getInjectSetupOK

      protected boolean getInjectSetupOK()
      Get the status of the flow with regards to the inject step.
      Returns:
      true if the flow is good to go
    • setStreamData

      protected void setStreamData(boolean streamData)
      Set whether the flow is to have data streamed to it (i.e. instance events).
      Parameters:
      streamData - true if data is to be streamed to the knowledge flow
    • getStreamData

      protected boolean getStreamData()
      Get whether data is to be streamed to the flow.
      Returns:
      true if data is to be streamed to the flow
    • setIncrementalStatus

      protected void setIncrementalStatus(int status)
      Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).
      Parameters:
      status - the status of the current incremental event
    • getBufferingForStreaming

      protected boolean getBufferingForStreaming()
      Returns true if streaming has been selected and there are nominal attributes in the incoming data. In this case, we have to buffer some data in order to determine nominal values.
      Returns:
      true if streaming is selected and we will be buffering data (i.e. there are nominal attributes in the incoming data)
    • setClassAttributeName

      protected void setClassAttributeName(String classAttName)
      Set the name of the attribute to use as the class.
      Parameters:
      classAttName - the name of the class attribute
    • setOutputRowMeta

      protected void setOutputRowMeta(org.pentaho.di.core.row.RowMetaInterface rmi)
      Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta class
      Parameters:
      rmi - the incoming row meta data
    • getOutputRowMeta

      protected org.pentaho.di.core.row.RowMetaInterface getOutputRowMeta()
      Get the output row meta data.
      Returns:
      the output row meta data
    • setSampleRelationName

      protected void setSampleRelationName(String relationName)
      Set the relation name for the sample.
      Parameters:
      relationName - the relation name to use for the sample
    • setInjectStep

      protected void setInjectStep(weka.gui.beans.BeanInstance injectStep)
      Set the knowledge flow step to inject data into
      Parameters:
      injectStep - the knowledge flow step to inject data into
    • setInjectEventName

      protected void setInjectEventName(String injectEventName)
      Set the name of the event to inject data using.
      Parameters:
      injectEventName - the name of the event to inject data with
    • loadSerializedKF

      protected static Vector<Vector<?>> loadSerializedKF(File kfFile) throws Exception
      Loads a serialized knowledge flow
      Parameters:
      kfFile - a File value
      Returns:
      a Vector containing a Vector of beans and a Vector of connections
      Throws:
      Exception - if there is a problem during loading
    • flowToXML

      protected static String flowToXML(Vector<Vector<?>> flow) throws Exception
      Throws:
      Exception
    • xmlToFlow

      protected static Vector<Vector<?>> xmlToFlow(String xml) throws Exception
      Throws:
      Exception
    • reservoirToInstances

      protected weka.core.Instances reservoirToInstances(org.pentaho.di.core.row.RowMetaInterface inputRowMeta) throws org.pentaho.di.core.exception.KettleException
      Convert the contents of the reservoir into a set of Instances.
      Parameters:
      inputRowMeta - the meta data for the incoming rows
      Returns:
      an Instances object holding all the data in the reservoir
      Throws:
      org.pentaho.di.core.exception.KettleException - if there is a problem during the conversion
    • injectInstance

      protected void injectInstance(weka.core.Instance inst)
      Inject a single instance into the knowledge flow. Used in streaming setups.
      Parameters:
      inst - the instance to inject
    • injectData

      protected void injectData(weka.core.Instances data, Vector beans) throws Exception
      Inject batch data into the knowledge flow.
      Parameters:
      data - the instances to inject
      beans - the flow being executed
      Throws:
      Exception
    • processRow

      protected void processRow(Object[] inputRow, org.pentaho.di.core.row.RowMetaInterface inputMeta) throws Exception
      Processes a single incoming row. Depending on the setup, this may entail checking to see if this row should go into the reservoir/cache or be passed directly on to the knowledge flow.
      Parameters:
      inputRow - the incoming Kettle row
      inputMeta - the row meta data
      Throws:
      Exception - if something goes wrong
    • flushStreamingBuffer

      public void flushStreamingBuffer(org.pentaho.di.core.row.RowMetaInterface inputMeta, Object[] inputRow) throws org.pentaho.di.core.exception.KettleException
      Throws:
      org.pentaho.di.core.exception.KettleException
    • cleanUp

      public void cleanUp()
      Free memory used by the reservoir/cache
    • findKFStep

      protected static weka.gui.beans.BeanInstance findKFStep(Vector flow, String stepName)
    • getAllAllowableOutputStepNames

      protected static ArrayList<String> getAllAllowableOutputStepNames(Vector flow)
      Return a list of all valid output steps in the flow (i.e. those that produce at least one event that we can accept).
      Parameters:
      flow - the flow to use
      Returns:
      a list of valid output steps
    • getAllAllowableOutputEventNames

      protected static ArrayList<String> getAllAllowableOutputEventNames(Vector flow, String compName)
      Get a list of all event names that we can accept from the supplied knowledgeflow component.
      Parameters:
      flow - the flow to use
      compName - the name of the component to generate the list for
      Returns:
      the list of event names that we can accept from the supplied component
    • getAllInjectStepNames

      protected static ArrayList<String> getAllInjectStepNames(Vector flow)
      Look for all knowledge flow components that are of type "KettleInject"
      Parameters:
      flow - the flow to use
      Returns:
      an ArrayList of the names of knowledge flow components that are of type KettleInject
    • getEventsForNamedStep

      protected static ArrayList<String> getEventsForNamedStep(Vector flow, String compName)
      Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at present
      Parameters:
      flow - the flow to use
      compName - the knowledge flow step in question
      an - ArrayList of the acceptable events (connections)
    • validateOutputSetup

      protected static void validateOutputSetup(Vector flow, Object outputListener, String outputStepName, String outputEventName) throws org.pentaho.di.core.exception.KettleStepException
      Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.
      Parameters:
      outputListener - the object to register with the output step (may be null if just validation is required)
      flow - a Vector containing a Vector of beans and a Vector of connections
      outputStepName - the name of the output Knowledge Flow step
      outputEventName - the name of the even produced by the output step to listen for
      Throws:
      org.pentaho.di.core.exception.KettleStepException - if a problem occurs
    • validateInjectSetup

      protected static void validateInjectSetup(Vector flow, String injectStepName, String injectEventName) throws org.pentaho.di.core.exception.KettleStepException
      Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).
      Parameters:
      the - flow to use
      injectStepName - the name of the Knowledge Flow step to inject data into
      injectEventName - the name of the event type to use to pass data to the inject step
      Throws:
      org.pentaho.di.core.exception.KettleStepException - if validation fails for some reason