Class KFData

  • All Implemented Interfaces:
    org.pentaho.di.trans.step.StepDataInterface

    public class KFData
    extends org.pentaho.di.trans.step.BaseStepData
    implements org.pentaho.di.trans.step.StepDataInterface
    Holds temporary data and various utility routines for loading serialized knowledge flows etc.
    Version:
    $Revision: 43074 $
    Author:
    Mark Hall (mhall{[at]}pentaho{[dot]}Com)
    • Nested Class Summary

      • Nested classes/interfaces inherited from class org.pentaho.di.trans.step.BaseStepData

        org.pentaho.di.trans.step.BaseStepData.StepExecutionStatus
    • Field Summary

      Fields 
      Modifier and Type Field Description
      static String[] INJECT_EVENTS
      Allowable type of events to generate to inject data into the knowledge flow
      protected boolean m_bufferingComplete
      True when buffering for header creation has been completed for streaming setups
      protected String m_classAttributeName
      The name of the class attribute (if any).
      protected int m_currentRow
      the current row number
      protected boolean m_hasNominalAtts
      True if incoming data has nominal attributes
      protected int m_incrementalStatus
      Status to use for incremental instance events
      protected org.pentaho.dm.commons.ArffMeta[] m_injectArffMetas
      Meta data for the fields to be injected into the knowledge flow
      protected String m_injectEventName
      The name of the event to use for injecting (must be instance for streaming)
      protected int[] m_injectFieldIndexes
      Mapping to fields in incoming kettle data
      protected org.pentaho.dm.commons.ArffMeta[] m_injectFields
      Meta data for the ARFF instances input to the inject step
      protected boolean m_injectSetupOK
      If the inject setup validates, then we can process rows for input into the flow
      protected weka.gui.beans.BeanInstance m_injectStep
      The step in the knowledge flow to inject into
      protected int m_k
      the size of the sample
      protected org.pentaho.di.core.logging.LogChannelInterface m_log
      logging
      protected Map<Object,​Object>[] m_nominalVals
      Maps to hold the nominal values of the ARFF data for the inject step
      protected org.pentaho.di.core.row.RowMetaInterface m_outputRowMeta
      The (eventual) outgoing row meta data
      protected Random m_random
      random number generator
      protected List<Object[]> m_sample
      Holds sampled rows
      protected String m_sampleRelationName
      Relation name for Instances created from the sampled rows
      protected boolean m_streamData
      True if data is to be injected one instance at a time
      protected weka.core.Instances m_streamingHeader
      Header for instances that are to be streamed.
      static String[] OUTPUT_EVENTS
      Allowable types of events that we can listen for from the knowledge flow
    • Constructor Summary

      Constructors 
      Constructor Description
      KFData()  
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected void allocate​(int num)
      Allocate an array to hold meta data for the ARFF instances
      void cleanUp()
      Free memory used by the reservoir/cache
      protected static weka.gui.beans.BeanInstance findKFStep​(Vector flow, String stepName)  
      protected static String flowToXML​(Vector<Vector<?>> flow)  
      void flushStreamingBuffer​(org.pentaho.di.core.row.RowMetaInterface inputMeta, Object[] inputRow)  
      protected static ArrayList<String> getAllAllowableOutputEventNames​(Vector flow, String compName)
      Get a list of all event names that we can accept from the supplied knowledgeflow component.
      protected static ArrayList<String> getAllAllowableOutputStepNames​(Vector flow)
      Return a list of all valid output steps in the flow (i.e.
      protected static ArrayList<String> getAllInjectStepNames​(Vector flow)
      Look for all knowledge flow components that are of type "KettleInject"
      protected boolean getBufferingForStreaming()
      Returns true if streaming has been selected and there are nominal attributes in the incoming data.
      protected static ArrayList<String> getEventsForNamedStep​(Vector flow, String compName)
      Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at present
      org.pentaho.dm.commons.ArffMeta[] getInjectFields()  
      protected boolean getInjectSetupOK()
      Get the status of the flow with regards to the inject step.
      protected org.pentaho.di.core.row.RowMetaInterface getOutputRowMeta()
      Get the output row meta data.
      protected boolean getStreamData()
      Get whether data is to be streamed to the flow.
      protected void initializeReservoir​(int sampleSize, int seed)
      Initialize the reservoir/cache for the requested sample (or cache) size.
      protected void injectData​(weka.core.Instances data, Vector beans)
      Inject batch data into the knowledge flow.
      protected void injectInstance​(weka.core.Instance inst)
      Inject a single instance into the knowledge flow.
      protected static Vector<Vector<?>> loadSerializedKF​(File kfFile)
      Loads a serialized knowledge flow
      protected void processRow​(Object[] inputRow, org.pentaho.di.core.row.RowMetaInterface inputMeta)
      Processes a single incoming row.
      protected weka.core.Instances reservoirToInstances​(org.pentaho.di.core.row.RowMetaInterface inputRowMeta)
      Convert the contents of the reservoir into a set of Instances.
      protected void setClassAttributeName​(String classAttName)
      Set the name of the attribute to use as the class.
      protected void setIncrementalStatus​(int status)
      Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).
      protected void setInjectEventName​(String injectEventName)
      Set the name of the event to inject data using.
      protected void setInjectFieldIndexes​(int[] injectFieldIndexes, org.pentaho.dm.commons.ArffMeta[] arffMetas)
      Set the indexes of the fields to inject into the knowledge flow
      void setInjectFields​(org.pentaho.dm.commons.ArffMeta[] arffMeta)  
      protected void setInjectSetupOK​(boolean ok)
      Sets the status of the flow with regards to the step to inject into.
      protected void setInjectStep​(weka.gui.beans.BeanInstance injectStep)
      Set the knowledge flow step to inject data into
      void setLog​(org.pentaho.di.core.logging.LogChannelInterface log)  
      protected void setOutputRowMeta​(org.pentaho.di.core.row.RowMetaInterface rmi)
      Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta class
      protected void setSampleRelationName​(String relationName)
      Set the relation name for the sample.
      protected void setStreamData​(boolean streamData)
      Set whether the flow is to have data streamed to it (i.e.
      protected void setupArffMeta​(org.pentaho.di.core.row.RowMetaInterface rmi)
      Sets up the ArffMeta array based on the incomming Kettle row format.
      protected static void validateInjectSetup​(Vector flow, String injectStepName, String injectEventName)
      Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).
      protected static void validateOutputSetup​(Vector flow, Object outputListener, String outputStepName, String outputEventName)
      Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.
      protected static Vector<Vector<?>> xmlToFlow​(String xml)  
      • Methods inherited from class org.pentaho.di.trans.step.BaseStepData

        getStatus, isDisposed, isEmpty, isFinished, isIdle, isInitialising, isRunning, isStopped, setStatus
      • Methods inherited from interface org.pentaho.di.trans.step.StepDataInterface

        getStatus, isDisposed, isEmpty, isFinished, isIdle, isInitialising, isRunning, setStatus
    • Field Detail

      • m_outputRowMeta

        protected org.pentaho.di.core.row.RowMetaInterface m_outputRowMeta
        The (eventual) outgoing row meta data
      • m_injectFields

        protected org.pentaho.dm.commons.ArffMeta[] m_injectFields
        Meta data for the ARFF instances input to the inject step
      • m_injectSetupOK

        protected boolean m_injectSetupOK
        If the inject setup validates, then we can process rows for input into the flow
      • m_injectArffMetas

        protected org.pentaho.dm.commons.ArffMeta[] m_injectArffMetas
        Meta data for the fields to be injected into the knowledge flow
      • m_injectFieldIndexes

        protected int[] m_injectFieldIndexes
        Mapping to fields in incoming kettle data
      • m_hasNominalAtts

        protected boolean m_hasNominalAtts
        True if incoming data has nominal attributes
      • m_streamData

        protected boolean m_streamData
        True if data is to be injected one instance at a time
      • m_incrementalStatus

        protected int m_incrementalStatus
        Status to use for incremental instance events
      • m_streamingHeader

        protected weka.core.Instances m_streamingHeader
        Header for instances that are to be streamed. If there are no nominal attributes in the incoming data, then this will get constructed on the first incoming row. Otherwise, it will be constructed after we have seen x rows (where x is the number of rows to cache in order to determine legal nominal values).
      • m_classAttributeName

        protected String m_classAttributeName
        The name of the class attribute (if any).
      • m_bufferingComplete

        protected boolean m_bufferingComplete
        True when buffering for header creation has been completed for streaming setups
      • m_injectStep

        protected weka.gui.beans.BeanInstance m_injectStep
        The step in the knowledge flow to inject into
      • m_injectEventName

        protected String m_injectEventName
        The name of the event to use for injecting (must be instance for streaming)
      • m_nominalVals

        protected Map<Object,​Object>[] m_nominalVals
        Maps to hold the nominal values of the ARFF data for the inject step
      • m_k

        protected int m_k
        the size of the sample
      • m_currentRow

        protected int m_currentRow
        the current row number
      • m_sample

        protected List<Object[]> m_sample
        Holds sampled rows
      • m_sampleRelationName

        protected String m_sampleRelationName
        Relation name for Instances created from the sampled rows
      • m_random

        protected Random m_random
        random number generator
      • m_log

        protected org.pentaho.di.core.logging.LogChannelInterface m_log
        logging
      • INJECT_EVENTS

        public static final String[] INJECT_EVENTS
        Allowable type of events to generate to inject data into the knowledge flow
      • OUTPUT_EVENTS

        public static final String[] OUTPUT_EVENTS
        Allowable types of events that we can listen for from the knowledge flow
    • Constructor Detail

      • KFData

        public KFData()
    • Method Detail

      • setLog

        public void setLog​(org.pentaho.di.core.logging.LogChannelInterface log)
      • setInjectFields

        public void setInjectFields​(org.pentaho.dm.commons.ArffMeta[] arffMeta)
      • getInjectFields

        public org.pentaho.dm.commons.ArffMeta[] getInjectFields()
      • allocate

        protected void allocate​(int num)
        Allocate an array to hold meta data for the ARFF instances
        Parameters:
        num - number of meta data objects to allocate
      • setupArffMeta

        protected void setupArffMeta​(org.pentaho.di.core.row.RowMetaInterface rmi)
        Sets up the ArffMeta array based on the incomming Kettle row format.
        Parameters:
        rmi - a RowMetaInterface value
      • initializeReservoir

        protected void initializeReservoir​(int sampleSize,
                                           int seed)
        Initialize the reservoir/cache for the requested sample (or cache) size.
        Parameters:
        sampleSize - the number of rows to sample or to cache
        seed - the random number seed to use
      • setInjectFieldIndexes

        protected void setInjectFieldIndexes​(int[] injectFieldIndexes,
                                             org.pentaho.dm.commons.ArffMeta[] arffMetas)
        Set the indexes of the fields to inject into the knowledge flow
        Parameters:
        injectFieldIndexes - array of indexes
        arffMetas - array of arff metas
      • setInjectSetupOK

        protected void setInjectSetupOK​(boolean ok)
        Sets the status of the flow with regards to the step to inject into.
        Parameters:
        ok - true if the inject step is good to go
      • getInjectSetupOK

        protected boolean getInjectSetupOK()
        Get the status of the flow with regards to the inject step.
        Returns:
        true if the flow is good to go
      • setStreamData

        protected void setStreamData​(boolean streamData)
        Set whether the flow is to have data streamed to it (i.e. instance events).
        Parameters:
        streamData - true if data is to be streamed to the knowledge flow
      • getStreamData

        protected boolean getStreamData()
        Get whether data is to be streamed to the flow.
        Returns:
        true if data is to be streamed to the flow
      • setIncrementalStatus

        protected void setIncrementalStatus​(int status)
        Status of the current incremental event (FORMAT_AVAILABLE or INSTANCE_AVAILABLE).
        Parameters:
        status - the status of the current incremental event
      • getBufferingForStreaming

        protected boolean getBufferingForStreaming()
        Returns true if streaming has been selected and there are nominal attributes in the incoming data. In this case, we have to buffer some data in order to determine nominal values.
        Returns:
        true if streaming is selected and we will be buffering data (i.e. there are nominal attributes in the incoming data)
      • setClassAttributeName

        protected void setClassAttributeName​(String classAttName)
        Set the name of the attribute to use as the class.
        Parameters:
        classAttName - the name of the class attribute
      • setOutputRowMeta

        protected void setOutputRowMeta​(org.pentaho.di.core.row.RowMetaInterface rmi)
        Set the meta data for the incoming rows (later gets modified) into the output format by getFields() in the meta class
        Parameters:
        rmi - the incoming row meta data
      • getOutputRowMeta

        protected org.pentaho.di.core.row.RowMetaInterface getOutputRowMeta()
        Get the output row meta data.
        Returns:
        the output row meta data
      • setSampleRelationName

        protected void setSampleRelationName​(String relationName)
        Set the relation name for the sample.
        Parameters:
        relationName - the relation name to use for the sample
      • setInjectStep

        protected void setInjectStep​(weka.gui.beans.BeanInstance injectStep)
        Set the knowledge flow step to inject data into
        Parameters:
        injectStep - the knowledge flow step to inject data into
      • setInjectEventName

        protected void setInjectEventName​(String injectEventName)
        Set the name of the event to inject data using.
        Parameters:
        injectEventName - the name of the event to inject data with
      • loadSerializedKF

        protected static Vector<Vector<?>> loadSerializedKF​(File kfFile)
                                                     throws Exception
        Loads a serialized knowledge flow
        Parameters:
        kfFile - a File value
        Returns:
        a Vector containing a Vector of beans and a Vector of connections
        Throws:
        Exception - if there is a problem during loading
      • reservoirToInstances

        protected weka.core.Instances reservoirToInstances​(org.pentaho.di.core.row.RowMetaInterface inputRowMeta)
                                                    throws org.pentaho.di.core.exception.KettleException
        Convert the contents of the reservoir into a set of Instances.
        Parameters:
        inputRowMeta - the meta data for the incoming rows
        Returns:
        an Instances object holding all the data in the reservoir
        Throws:
        org.pentaho.di.core.exception.KettleException - if there is a problem during the conversion
      • injectInstance

        protected void injectInstance​(weka.core.Instance inst)
        Inject a single instance into the knowledge flow. Used in streaming setups.
        Parameters:
        inst - the instance to inject
      • injectData

        protected void injectData​(weka.core.Instances data,
                                  Vector beans)
                           throws Exception
        Inject batch data into the knowledge flow.
        Parameters:
        data - the instances to inject
        beans - the flow being executed
        Throws:
        Exception
      • processRow

        protected void processRow​(Object[] inputRow,
                                  org.pentaho.di.core.row.RowMetaInterface inputMeta)
                           throws Exception
        Processes a single incoming row. Depending on the setup, this may entail checking to see if this row should go into the reservoir/cache or be passed directly on to the knowledge flow.
        Parameters:
        inputRow - the incoming Kettle row
        inputMeta - the row meta data
        Throws:
        Exception - if something goes wrong
      • flushStreamingBuffer

        public void flushStreamingBuffer​(org.pentaho.di.core.row.RowMetaInterface inputMeta,
                                         Object[] inputRow)
                                  throws org.pentaho.di.core.exception.KettleException
        Throws:
        org.pentaho.di.core.exception.KettleException
      • cleanUp

        public void cleanUp()
        Free memory used by the reservoir/cache
      • findKFStep

        protected static weka.gui.beans.BeanInstance findKFStep​(Vector flow,
                                                                String stepName)
      • getAllAllowableOutputStepNames

        protected static ArrayList<String> getAllAllowableOutputStepNames​(Vector flow)
        Return a list of all valid output steps in the flow (i.e. those that produce at least one event that we can accept).
        Parameters:
        flow - the flow to use
        Returns:
        a list of valid output steps
      • getAllAllowableOutputEventNames

        protected static ArrayList<String> getAllAllowableOutputEventNames​(Vector flow,
                                                                           String compName)
        Get a list of all event names that we can accept from the supplied knowledgeflow component.
        Parameters:
        flow - the flow to use
        compName - the name of the component to generate the list for
        Returns:
        the list of event names that we can accept from the supplied component
      • getAllInjectStepNames

        protected static ArrayList<String> getAllInjectStepNames​(Vector flow)
        Look for all knowledge flow components that are of type "KettleInject"
        Parameters:
        flow - the flow to use
        Returns:
        an ArrayList of the names of knowledge flow components that are of type KettleInject
      • getEventsForNamedStep

        protected static ArrayList<String> getEventsForNamedStep​(Vector flow,
                                                                 String compName)
        Looks for the named step, and then returns all events (from those that we can generate) that this step can accept at present
        Parameters:
        flow - the flow to use
        compName - the knowledge flow step in question
        an - ArrayList of the acceptable events (connections)
      • validateOutputSetup

        protected static void validateOutputSetup​(Vector flow,
                                                  Object outputListener,
                                                  String outputStepName,
                                                  String outputEventName)
                                           throws org.pentaho.di.core.exception.KettleStepException
        Attempts to find the requested output step, checks that the requested output event type is generatable and then tries to register the supplied listener with the output step.
        Parameters:
        outputListener - the object to register with the output step (may be null if just validation is required)
        flow - a Vector containing a Vector of beans and a Vector of connections
        outputStepName - the name of the output Knowledge Flow step
        outputEventName - the name of the even produced by the output step to listen for
        Throws:
        org.pentaho.di.core.exception.KettleStepException - if a problem occurs
      • validateInjectSetup

        protected static void validateInjectSetup​(Vector flow,
                                                  String injectStepName,
                                                  String injectEventName)
                                           throws org.pentaho.di.core.exception.KettleStepException
        Attempts to find the requested inject step and check if the requested event type is one that this step generates (and will allow a connection).
        Parameters:
        the - flow to use
        injectStepName - the name of the Knowledge Flow step to inject data into
        injectEventName - the name of the event type to use to pass data to the inject step
        Throws:
        org.pentaho.di.core.exception.KettleStepException - if validation fails for some reason