Changeset 3040


Ignore:
Timestamp:
01/16/12 14:54:39 (4 months ago)
Author:
kasper
Message:

Ticket #747: Added solution, large refactoring AnalysisListener?

Files:
9 added
15 edited

Legend:

Unmodified
Added
Removed
  • AnalyzerBeans/trunk/cli/src/main/java/org/eobjects/analyzer/cli/CliProgressAnalysisListener.java

    r2919 r3040  
    3030import org.eobjects.analyzer.job.FilterJob; 
    3131import org.eobjects.analyzer.job.TransformerJob; 
     32import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 
    3233import org.eobjects.analyzer.job.runner.AnalysisListener; 
     34import org.eobjects.analyzer.job.runner.AnalyzerMetrics; 
     35import org.eobjects.analyzer.job.runner.ExplorerMetrics; 
     36import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 
    3337import org.eobjects.analyzer.result.AnalyzerResult; 
    3438 
     
    4044 
    4145        @Override 
    42         public void jobBegin(AnalysisJob job) { 
     46        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    4347        } 
    4448 
    4549        @Override 
    46         public void jobSuccess(AnalysisJob job) { 
     50        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    4751        } 
    4852 
    4953        @Override 
    50         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 
    51                 System.out.println("Analyzing " + expectedRows + " rows from table: " + table.getName()); 
     54        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 
     55                Table table = metrics.getTable(); 
     56                System.out.println("Analyzing rows from table: " + table.getName()); 
    5257                rowCounts.put(table, new AtomicInteger(0)); 
    5358        } 
    5459 
    5560        @Override 
    56         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 
     61        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 
     62                Table table = metrics.getTable(); 
    5763                AtomicInteger rowCount = rowCounts.get(table); 
    5864                if (rowCount != null) { 
     
    6874 
    6975        @Override 
    70         public void rowProcessingSuccess(AnalysisJob job, Table table) { 
    71                 System.out.println("Done processing rows from table: " + table.getName()); 
     76        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 
     77                System.out.println("Done processing rows from table: " + metrics.getTable().getName()); 
    7278        } 
    7379 
    7480        @Override 
    75         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob) { 
     81        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    7682        } 
    7783 
     
    97103 
    98104        @Override 
    99         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     105        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    100106        } 
    101107 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/ExplorerJob.java

    r2631 r3040  
    2323 
    2424public interface ExplorerJob extends ComponentJob { 
    25  
     25         
    2626        public ExplorerBeanDescriptor<?> getDescriptor(); 
    2727 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/concurrent/JobCompletionTaskListener.java

    r2648 r3040  
    2323import java.util.concurrent.TimeUnit; 
    2424 
    25 import org.eobjects.analyzer.job.AnalysisJob; 
     25import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 
    2626import org.eobjects.analyzer.job.runner.AnalysisListener; 
    2727import org.eobjects.analyzer.job.tasks.Task; 
     
    4040 
    4141        private final CountDownLatch _countDownLatch; 
    42         private final AnalysisJob _job; 
    4342        private final AnalysisListener _analysisListener; 
     43        private final AnalysisJobMetrics _analysisJobMetrics; 
    4444 
    45         public JobCompletionTaskListener(AnalysisJob job, AnalysisListener analysisListener, int callablesToWaitFor) { 
    46                 _job = job; 
     45        public JobCompletionTaskListener(AnalysisJobMetrics analysisJobMetrics, AnalysisListener analysisListener, 
     46                        int callablesToWaitFor) { 
     47                _analysisJobMetrics = analysisJobMetrics; 
    4748                _analysisListener = analysisListener; 
    4849                _countDownLatch = new CountDownLatch(callablesToWaitFor); 
     
    7172                _countDownLatch.countDown(); 
    7273                if (_countDownLatch.getCount() == 0) { 
    73                         _analysisListener.jobSuccess(_job); 
     74                        _analysisListener.jobSuccess(_analysisJobMetrics.getAnalysisJob(), _analysisJobMetrics); 
    7475                } 
    7576        } 
     
    7879        public void onError(Task task, Throwable throwable) { 
    7980                logger.debug("onError(...)"); 
    80                 _analysisListener.errorUknown(_job, throwable); 
     81                _analysisListener.errorUknown(_analysisJobMetrics.getAnalysisJob(), throwable); 
    8182                _countDownLatch.countDown(); 
    8283        } 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisListener.java

    r2919 r3040  
    2727import org.eobjects.analyzer.job.TransformerJob; 
    2828import org.eobjects.analyzer.result.AnalyzerResult; 
    29 import org.eobjects.metamodel.schema.Table; 
    3029 
    3130/** 
     
    3938public interface AnalysisListener { 
    4039 
    41         public void jobBegin(AnalysisJob job); 
     40        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics); 
    4241 
    43         public void jobSuccess(AnalysisJob job); 
     42        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics); 
    4443 
    4544        /** 
     
    5150         *            the count could not be determined. 
    5251         */ 
    53         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows); 
     52        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics); 
    5453 
    55         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow); 
     54        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow); 
    5655 
    57         public void rowProcessingSuccess(AnalysisJob job, Table table); 
     56        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics); 
    5857 
    59         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob); 
     58        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics); 
    6059 
    61         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob); 
     60        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics); 
    6261 
    6362        public void analyzerSuccess(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerResult result); 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisRunnerJobDelegate.java

    r3014 r3040  
    2222import java.util.ArrayList; 
    2323import java.util.Collection; 
    24 import java.util.HashMap; 
    25 import java.util.HashSet; 
    2624import java.util.List; 
    27 import java.util.Map; 
    2825import java.util.Queue; 
    29 import java.util.Set; 
    30  
    31 import org.eobjects.analyzer.beans.api.Analyzer; 
     26 
    3227import org.eobjects.analyzer.beans.api.Explorer; 
    33 import org.eobjects.analyzer.beans.api.Filter; 
    34 import org.eobjects.analyzer.beans.api.Transformer; 
    3528import org.eobjects.analyzer.configuration.AnalyzerBeansConfiguration; 
    3629import org.eobjects.analyzer.configuration.InjectionManager; 
     
    4639import org.eobjects.analyzer.job.MergeInput; 
    4740import org.eobjects.analyzer.job.MergedOutcomeJob; 
    48 import org.eobjects.analyzer.job.Outcome; 
    4941import org.eobjects.analyzer.job.TransformerJob; 
    5042import org.eobjects.analyzer.job.concurrent.ForkTaskListener; 
     
    6557import org.eobjects.analyzer.lifecycle.LifeCycleHelper; 
    6658import org.eobjects.analyzer.util.SourceColumnFinder; 
    67 import org.eobjects.metamodel.MetaModelHelper; 
    68 import org.eobjects.metamodel.schema.Column; 
    6959import org.eobjects.metamodel.schema.Table; 
    7060import org.slf4j.Logger; 
     
    8878        private final AnalysisListener _analysisListener; 
    8979        private final Queue<JobAndResult> _resultQueue; 
    90         private final JobCompletionTaskListener _jobCompletionTaskListener; 
    9180        private final ErrorAware _errorAware; 
    9281        private final Datastore _datastore; 
     
    10998                _sourceColumnFinder.addSources(_job); 
    11099 
    111                 // A task listener that will register either succesfull executions or 
    112                 // unexpected errors (which will be delegated to the errorListener) 
    113                 _jobCompletionTaskListener = new JobCompletionTaskListener(job, analysisListener, 2); 
    114100                _errorAware = errorAware; 
    115101 
     
    129115         */ 
    130116        public AnalysisResultFuture run() { 
    131                 _analysisListener.jobBegin(_job); 
    132  
    133                 validateSingleTableInput(_transformerJobs); 
    134                 validateSingleTableInput(_filterJobs); 
    135                 validateSingleTableInputForMergedOutcomes(_mergedOutcomeJobs); 
    136  
    137                 validateSingleTableInput(_analyzerJobs); 
    138  
    139                 // the injection manager is job scoped 
    140                 final InjectionManager injectionManager = _configuration.getInjectionManagerFactory().getInjectionManager(_job); 
    141  
    142                 // at this point we are done validating the job, it will run. 
    143                 scheduleExplorers(new LifeCycleHelper(injectionManager, new ReferenceDataActivationManager())); 
    144                 scheduleRowProcessing(new LifeCycleHelper(injectionManager, new ReferenceDataActivationManager())); 
    145  
    146                 return new AnalysisResultFutureImpl(_resultQueue, _jobCompletionTaskListener, _errorAware); 
     117                try { 
     118                        // the injection manager is job scoped 
     119                        final InjectionManager injectionManager = _configuration.getInjectionManagerFactory().getInjectionManager(_job); 
     120 
     121                        final LifeCycleHelper explorerLifeCycleHelper = new LifeCycleHelper(injectionManager, 
     122                                        new ReferenceDataActivationManager()); 
     123                        final LifeCycleHelper rowProcessingLifeCycleHelper = new LifeCycleHelper(injectionManager, 
     124                                        new ReferenceDataActivationManager()); 
     125 
     126                        final RowProcessingPublishers publishers = new RowProcessingPublishers(_job, _analysisListener, _taskRunner, 
     127                                        rowProcessingLifeCycleHelper, _sourceColumnFinder); 
     128 
     129                        final AnalysisJobMetrics analysisJobMetrics = new AnalysisJobMetricsImpl(_job, publishers); 
     130 
     131                        // A task listener that will register either succesfull executions 
     132                        // or 
     133                        // unexpected errors (which will be delegated to the errorListener) 
     134                        JobCompletionTaskListener jobCompletionTaskListener = new JobCompletionTaskListener(analysisJobMetrics, 
     135                                        _analysisListener, 2); 
     136 
     137                        _analysisListener.jobBegin(_job, analysisJobMetrics); 
     138 
     139                        validateSingleTableInput(_transformerJobs); 
     140                        validateSingleTableInput(_filterJobs); 
     141                        validateSingleTableInputForMergedOutcomes(_mergedOutcomeJobs); 
     142                        validateSingleTableInput(_analyzerJobs); 
     143 
     144                        // at this point we are done validating the job, it will run. 
     145                        scheduleExplorers(explorerLifeCycleHelper, jobCompletionTaskListener, analysisJobMetrics); 
     146                        scheduleRowProcessing(publishers, rowProcessingLifeCycleHelper, jobCompletionTaskListener, analysisJobMetrics); 
     147 
     148                        return new AnalysisResultFutureImpl(_resultQueue, jobCompletionTaskListener, _errorAware); 
     149                } catch (RuntimeException e) { 
     150                        _analysisListener.errorUknown(_job, e); 
     151                        throw e; 
     152                } 
     153 
    147154        } 
    148155 
     
    150157         * Starts row processing job flows. 
    151158         *  
     159         * @param publishers 
     160         * @param analysisJobMetrics 
     161         *  
    152162         * @param injectionManager 
    153163         */ 
    154         private void scheduleRowProcessing(LifeCycleHelper lifeCycleHelper) { 
    155                 final Map<Table, RowProcessingPublisher> rowProcessingPublishers = new HashMap<Table, RowProcessingPublisher>(); 
    156                 for (FilterJob filterJob : _filterJobs) { 
    157                         registerRowProcessingPublishers(rowProcessingPublishers, filterJob, _analysisListener, _taskRunner, 
    158                                         lifeCycleHelper); 
    159                 } 
    160  
    161                 for (MergedOutcomeJob mergedOutcomeJob : _mergedOutcomeJobs) { 
    162                         registerRowProcessingPublishers(rowProcessingPublishers, mergedOutcomeJob); 
    163                 } 
    164  
    165                 for (TransformerJob transformerJob : _transformerJobs) { 
    166                         registerRowProcessingPublishers(rowProcessingPublishers, transformerJob, _analysisListener, _taskRunner, 
    167                                         lifeCycleHelper); 
    168                 } 
    169                 for (AnalyzerJob analyzerJob : _analyzerJobs) { 
    170                         registerRowProcessingPublishers(rowProcessingPublishers, analyzerJob, _analysisListener, _taskRunner, 
    171                                         lifeCycleHelper); 
    172                 } 
    173  
    174                 logger.info("Created {} row processor publishers", rowProcessingPublishers.size()); 
     164        private void scheduleRowProcessing(RowProcessingPublishers publishers, LifeCycleHelper lifeCycleHelper, 
     165                        JobCompletionTaskListener jobCompletionTaskListener, AnalysisJobMetrics analysisJobMetrics) { 
     166 
     167                logger.info("Created {} row processor publishers", publishers.size()); 
    175168 
    176169                final List<TaskRunnable> finalTasks = new ArrayList<TaskRunnable>(2); 
    177                 finalTasks.add(new TaskRunnable(null, _jobCompletionTaskListener)); 
     170                finalTasks.add(new TaskRunnable(null, jobCompletionTaskListener)); 
    178171                finalTasks.add(new TaskRunnable(null, new CloseReferenceDataTaskListener(lifeCycleHelper))); 
    179172 
     
    181174                                finalTasks); 
    182175 
    183                 final TaskListener rowProcessorPublishersDoneCompletionListener = new JoinTaskListener( 
    184                                 rowProcessingPublishers.size(), finalTaskListener); 
    185  
    186                 for (RowProcessingPublisher rowProcessingPublisher : rowProcessingPublishers.values()) { 
    187                         List<TaskRunnable> initTasks = rowProcessingPublisher.createInitialTasks(_taskRunner, _resultQueue, 
    188                                         rowProcessorPublishersDoneCompletionListener, _datastore); 
     176                final TaskListener rowProcessorPublishersDoneCompletionListener = new JoinTaskListener(publishers.size(), 
     177                                finalTaskListener); 
     178 
     179                final Table[] tables = publishers.getTables(); 
     180                for (Table table : tables) { 
     181                        final RowProcessingPublisher rowProcessingPublisher = publishers.getRowProcessingPublisher(table); 
     182                        final List<TaskRunnable> initTasks = rowProcessingPublisher.createInitialTasks(_taskRunner, _resultQueue, 
     183                                        rowProcessorPublishersDoneCompletionListener, _datastore, analysisJobMetrics); 
    189184                        logger.debug("Scheduling {} tasks for row processing publisher: {}", initTasks.size(), rowProcessingPublisher); 
    190185                        for (TaskRunnable taskRunnable : initTasks) { 
     
    199194         * @param injectionManager 
    200195         */ 
    201         private void scheduleExplorers(final LifeCycleHelper lifeCycleHelper) { 
     196        private void scheduleExplorers(final LifeCycleHelper lifeCycleHelper, 
     197                        final JobCompletionTaskListener jobCompletionTaskListener, final AnalysisJobMetrics analysisJobMetrics) { 
    202198                final int numExplorerJobs = _explorerJobs.size(); 
    203199                if (numExplorerJobs == 0) { 
    204                         _jobCompletionTaskListener.onComplete(null); 
     200                        jobCompletionTaskListener.onComplete(null); 
    205201                        return; 
    206202                } 
    207203 
    208204                final List<TaskRunnable> finalTasks = new ArrayList<TaskRunnable>(); 
    209                 finalTasks.add(new TaskRunnable(null, _jobCompletionTaskListener)); 
     205                finalTasks.add(new TaskRunnable(null, jobCompletionTaskListener)); 
    210206                finalTasks.add(new TaskRunnable(null, new CloseReferenceDataTaskListener(lifeCycleHelper))); 
    211207 
     
    215211                // begin explorer jobs first because they can run independently ( 
    216212                for (ExplorerJob explorerJob : _explorerJobs) { 
    217                         final DatastoreConnection dataContextProvider = _datastore.openConnection(); 
    218  
    219                         ExplorerBeanDescriptor<?> descriptor = explorerJob.getDescriptor(); 
    220                         Explorer<?> explorer = descriptor.newInstance(); 
    221  
    222                         finalTasks.add(new TaskRunnable(null, new CloseResourcesTaskListener(dataContextProvider))); 
     213                        final ExplorerMetrics metrics = analysisJobMetrics.getExplorerMetrics(explorerJob); 
     214 
     215                        final DatastoreConnection connection = _datastore.openConnection(); 
     216 
     217                        final ExplorerBeanDescriptor<?> descriptor = explorerJob.getDescriptor(); 
     218                        final Explorer<?> explorer = descriptor.newInstance(); 
     219 
     220                        finalTasks.add(new TaskRunnable(null, new CloseResourcesTaskListener(connection))); 
    223221                        finalTasks.add(new TaskRunnable(null, new CloseBeanTaskListener(lifeCycleHelper, descriptor, explorer))); 
    224222 
    225223                        // set up scheduling for the explorers 
    226                         Task closeTask = new CollectResultsTask(explorer, _job, explorerJob, _resultQueue, _analysisListener); 
    227                         TaskListener runFinishedListener = new RunNextTaskTaskListener(_taskRunner, closeTask, explorersDoneTaskListener); 
    228                         Task runTask = new RunExplorerTask(explorer, _job, explorerJob, _datastore, _analysisListener); 
     224                        final Task closeTask = new CollectResultsTask(explorer, _job, explorerJob, _resultQueue, _analysisListener); 
     225                        final TaskListener runFinishedListener = new RunNextTaskTaskListener(_taskRunner, closeTask, 
     226                                        explorersDoneTaskListener); 
     227                        final Task runTask = new RunExplorerTask(explorer, metrics, _datastore, _analysisListener); 
    229228 
    230229                        TaskListener referenceDataInitFinishedListener = new RunNextTaskTaskListener(_taskRunner, runTask, 
     
    297296        } 
    298297 
    299         private void registerRowProcessingPublishers(Map<Table, RowProcessingPublisher> rowProcessingPublishers, 
    300                         MergedOutcomeJob mergedOutcomeJob) { 
    301  
    302                 Collection<RowProcessingPublisher> publishers = rowProcessingPublishers.values(); 
    303                 for (RowProcessingPublisher rowProcessingPublisher : publishers) { 
    304                         boolean prerequisiteOutcomesExist = true; 
    305                         MergeInput[] mergeInputs = mergedOutcomeJob.getMergeInputs(); 
    306                         for (MergeInput mergeInput : mergeInputs) { 
    307                                 Outcome prerequisiteOutcome = mergeInput.getOutcome(); 
    308                                 if (!rowProcessingPublisher.containsOutcome(prerequisiteOutcome)) { 
    309                                         prerequisiteOutcomesExist = false; 
    310                                         break; 
    311                                 } 
    312                         } 
    313  
    314                         if (prerequisiteOutcomesExist) { 
    315                                 rowProcessingPublisher.addMergedOutcomeJob(mergedOutcomeJob); 
    316                         } 
    317                 } 
    318         } 
    319  
    320         private void registerRowProcessingPublishers(final Map<Table, RowProcessingPublisher> rowProcessingPublishers, 
    321                         final ConfigurableBeanJob<?> beanJob, final AnalysisListener listener, final TaskRunner taskRunner, 
    322                         final LifeCycleHelper lifeCycleHelper) { 
    323                 final Set<Column> physicalColumns = new HashSet<Column>(); 
    324  
    325                 final InputColumn<?>[] inputColumns = beanJob.getInput(); 
    326                 for (InputColumn<?> inputColumn : inputColumns) { 
    327                         physicalColumns.addAll(_sourceColumnFinder.findOriginatingColumns(inputColumn)); 
    328                 } 
    329                 final Outcome[] requirements = beanJob.getRequirements(); 
    330                 for (Outcome requirement : requirements) { 
    331                         physicalColumns.addAll(_sourceColumnFinder.findOriginatingColumns(requirement)); 
    332                 } 
    333  
    334                 final Column[] physicalColumnsArray = physicalColumns.toArray(new Column[physicalColumns.size()]); 
    335                 final Table[] tables; 
    336                 if (physicalColumns.isEmpty()) { 
    337                         // if not dependent on any specific tables, make component available 
    338                         // for all tables 
    339                         Set<Table> allTables = new HashSet<Table>(); 
    340                         Collection<InputColumn<?>> allSourceColumns = _job.getSourceColumns(); 
    341                         for (InputColumn<?> inputColumn : allSourceColumns) { 
    342                                 allTables.add(inputColumn.getPhysicalColumn().getTable()); 
    343                         } 
    344                         tables = allTables.toArray(new Table[allTables.size()]); 
    345                 } else { 
    346                         tables = MetaModelHelper.getTables(physicalColumnsArray); 
    347                 } 
    348  
    349                 for (Table table : tables) { 
    350                         RowProcessingPublisher rowPublisher = rowProcessingPublishers.get(table); 
    351                         if (rowPublisher == null) { 
    352                                 rowPublisher = new RowProcessingPublisher(_job, table, taskRunner, listener, lifeCycleHelper); 
    353                                 rowProcessingPublishers.put(table, rowPublisher); 
    354                         } 
    355  
    356                         // register the physical columns needed by this job 
    357                         Column[] tableColumns = MetaModelHelper.getTableColumns(table, physicalColumnsArray); 
    358                         rowPublisher.addPhysicalColumns(tableColumns); 
    359  
    360                         // find which input columns (both physical or virtual) are needed by 
    361                         // this per-table instance 
    362                         InputColumn<?>[] localInputColumns = getLocalInputColumns(table, inputColumns); 
    363  
    364                         if (beanJob instanceof AnalyzerJob) { 
    365                                 AnalyzerJob analyzerJob = (AnalyzerJob) beanJob; 
    366                                 Analyzer<?> analyzer = analyzerJob.getDescriptor().newInstance(); 
    367                                 rowPublisher.addRowProcessingAnalyzerBean(analyzer, analyzerJob, localInputColumns); 
    368                         } else if (beanJob instanceof TransformerJob) { 
    369                                 TransformerJob transformerJob = (TransformerJob) beanJob; 
    370                                 Transformer<?> transformer = transformerJob.getDescriptor().newInstance(); 
    371                                 rowPublisher.addTransformerBean(transformer, transformerJob, localInputColumns); 
    372                         } else if (beanJob instanceof FilterJob) { 
    373                                 FilterJob filterJob = (FilterJob) beanJob; 
    374                                 Filter<?> filter = filterJob.getDescriptor().newInstance(); 
    375                                 rowPublisher.addFilterBean(filter, filterJob, localInputColumns); 
    376                         } else { 
    377                                 throw new UnsupportedOperationException("Unsupported job type: " + beanJob); 
    378                         } 
    379                 } 
    380         } 
    381  
    382         private InputColumn<?>[] getLocalInputColumns(Table table, InputColumn<?>[] inputColumns) { 
    383                 if (table == null || inputColumns == null || inputColumns.length == 0) { 
    384                         return new InputColumn<?>[0]; 
    385                 } 
    386                 List<InputColumn<?>> result = new ArrayList<InputColumn<?>>(); 
    387                 for (InputColumn<?> inputColumn : inputColumns) { 
    388                         Set<Column> sourcePhysicalColumns = _sourceColumnFinder.findOriginatingColumns(inputColumn); 
    389                         for (Column physicalColumn : sourcePhysicalColumns) { 
    390                                 if (table.equals(physicalColumn.getTable())) { 
    391                                         result.add(inputColumn); 
    392                                         break; 
    393                                 } 
    394                         } 
    395                 } 
    396                 return result.toArray(new InputColumn<?>[result.size()]); 
    397         } 
    398298} 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/CompositeAnalysisListener.java

    r2919 r3040  
    3131import org.eobjects.analyzer.result.AnalyzerResult; 
    3232 
    33 import org.eobjects.metamodel.schema.Table; 
    34  
    3533public final class CompositeAnalysisListener implements AnalysisListener { 
    3634 
     
    5048 
    5149        @Override 
    52         public void jobBegin(AnalysisJob job) { 
     50        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5351                for (AnalysisListener delegate : _delegates) { 
    54                         delegate.jobBegin(job); 
     52                        delegate.jobBegin(job, metrics); 
    5553                } 
    5654        } 
    5755 
    5856        @Override 
    59         public void jobSuccess(AnalysisJob job) { 
     57        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    6058                for (AnalysisListener delegate : _delegates) { 
    61                         delegate.jobSuccess(job); 
     59                        delegate.jobSuccess(job, metrics); 
    6260                } 
    6361        } 
    6462 
    6563        @Override 
    66         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 
     64        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 
    6765                for (AnalysisListener delegate : _delegates) { 
    68                         delegate.rowProcessingBegin(job, table, expectedRows); 
     66                        delegate.rowProcessingBegin(job, metrics); 
    6967                } 
    7068        } 
    7169 
    7270        @Override 
    73         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 
     71        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 
    7472                for (AnalysisListener delegate : _delegates) { 
    75                         delegate.rowProcessingProgress(job, table, currentRow); 
     73                        delegate.rowProcessingProgress(job, metrics, currentRow); 
    7674                } 
    7775        } 
    7876 
    7977        @Override 
    80         public void rowProcessingSuccess(AnalysisJob job, Table table) { 
     78        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 
    8179                for (AnalysisListener delegate : _delegates) { 
    82                         delegate.rowProcessingSuccess(job, table); 
     80                        delegate.rowProcessingSuccess(job, metrics); 
    8381                } 
    8482        } 
    8583 
    8684        @Override 
    87         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob) { 
     85        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    8886                for (AnalysisListener delegate : _delegates) { 
    89                         delegate.analyzerBegin(job, analyzerJob); 
     87                        delegate.analyzerBegin(job, analyzerJob, metrics); 
    9088                } 
    9189        } 
    9290 
    9391        @Override 
    94         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     92        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    9593                for (AnalysisListener delegate : _delegates) { 
    96                         delegate.explorerBegin(job, explorerJob); 
     94                        delegate.explorerBegin(job, explorerJob, metrics); 
    9795                } 
    9896        } 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/DebugLoggingAnalysisListener.java

    r2919 r3040  
    3030import org.slf4j.LoggerFactory; 
    3131 
    32 import org.eobjects.metamodel.schema.Table; 
    33  
    3432/** 
    3533 * AnalysisListener used for DEBUG level logging. This listener is obviously 
     
    5149 
    5250        @Override 
    53         public void jobBegin(AnalysisJob job) { 
     51        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5452                logger.debug("jobBegin({})", job); 
    5553        } 
    5654 
    5755        @Override 
    58         public void jobSuccess(AnalysisJob job) { 
     56        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5957                logger.debug("jobSuccess({})", job); 
    6058        } 
    6159 
    6260        @Override 
    63         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 
    64                 logger.debug("rowProcessingBegin({}, {}, {})", new Object[] { job, table, expectedRows }); 
     61        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 
     62                logger.debug("rowProcessingBegin({}, {})", new Object[] { job, metrics.getTable() }); 
    6563        } 
    6664 
    6765        @Override 
    68         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 
    69                 logger.debug("rowProcessingProgress({}, {}, {})", new Object[] { job, table, currentRow }); 
     66        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 
     67                logger.debug("rowProcessingProgress({}, {}, {})", new Object[] { job, metrics.getTable(), currentRow }); 
    7068        } 
    7169 
    7270        @Override 
    73         public void rowProcessingSuccess(AnalysisJob job, Table table) { 
    74                 logger.debug("rowProcessingSuccess({}, {})", new Object[] { job, table }); 
     71        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 
     72                logger.debug("rowProcessingSuccess({}, {})", new Object[] { job, metrics.getTable() }); 
    7573        } 
    7674 
    7775        @Override 
    78         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob) { 
     76        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    7977                logger.debug("analyzerBegin({}, {})", new Object[] { job, analyzerJob }); 
    8078        } 
    8179 
    8280        @Override 
    83         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     81        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    8482                logger.debug("explorerBegin({}, {})", new Object[] { job, explorerJob }); 
    8583        } 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/ErrorAwareAnalysisListener.java

    r2919 r3040  
    3535import org.slf4j.LoggerFactory; 
    3636 
    37 import org.eobjects.metamodel.schema.Table; 
    38  
    3937/** 
    4038 * AnalysisListener that will register errors 
     
    5048 
    5149        @Override 
    52         public void jobBegin(AnalysisJob job) { 
     50        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5351        } 
    5452 
    5553        @Override 
    56         public void jobSuccess(AnalysisJob job) { 
     54        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5755        } 
    5856 
     
    113111 
    114112        @Override 
    115         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     113        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    116114        } 
    117115 
     
    121119 
    122120        @Override 
    123         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 
     121        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 
    124122        } 
    125123 
    126124        @Override 
    127         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 
     125        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 
    128126        } 
    129127 
    130128        @Override 
    131         public void rowProcessingSuccess(AnalysisJob job, Table table) { 
     129        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 
    132130        } 
    133131 
    134132        @Override 
    135         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob) { 
     133        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    136134        } 
    137135 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/InfoLoggingAnalysisListener.java

    r2919 r3040  
    3030import org.slf4j.LoggerFactory; 
    3131 
    32 import org.eobjects.metamodel.schema.Table; 
    33  
    3432/** 
    3533 * AnalysisListener used for INFO level logging. This listener will log 
     
    5149 
    5250        @Override 
    53         public void jobBegin(AnalysisJob job) { 
     51        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5452                // do nothing 
    5553        } 
    5654 
    5755        @Override 
    58         public void jobSuccess(AnalysisJob job) { 
     56        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    5957                // do nothing 
    6058        } 
    6159 
    6260        @Override 
    63         public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 
    64                 logger.info("Beginning row processing of {} rows in {}", new Object[] { expectedRows, table }); 
     61        public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 
     62                logger.info("Beginning row processing of {}", metrics.getTable()); 
    6563        } 
    6664 
    6765        @Override 
    68         public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 
     66        public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 
    6967                if (currentRow > 0 && currentRow % 1000 == 0) { 
    70                         logger.info("Reading row no. {} in {}", new Object[] { currentRow, table.getName() }); 
     68                        logger.info("Reading row no. {} in {}", new Object[] { currentRow, metrics.getTable().getName() }); 
    7169                } 
    7270        } 
     
    7876 
    7977        @Override 
    80         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     78        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    8179                // do nothing 
    8280        } 
     
    8886 
    8987        @Override 
    90         public void rowProcessingSuccess(AnalysisJob job, Table table) { 
     88        public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 
    9189                // do nothing 
    9290        } 
    9391 
    9492        @Override 
    95         public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob) { 
     93        public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    9694                // do nothing 
    9795        } 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingPublisher.java

    r3014 r3040  
    2323import java.util.Arrays; 
    2424import java.util.Collection; 
    25 import java.util.Collections; 
    2625import java.util.HashSet; 
    2726import java.util.List; 
     
    3332import org.eobjects.analyzer.beans.api.Filter; 
    3433import org.eobjects.analyzer.beans.api.Transformer; 
    35 import org.eobjects.analyzer.beans.convert.ConvertToNumberTransformer; 
    3634import org.eobjects.analyzer.connection.Datastore; 
    3735import org.eobjects.analyzer.connection.DatastoreConnection; 
     
    6967import org.eobjects.metamodel.schema.Table; 
    7068import org.eobjects.metamodel.util.CollectionUtils; 
     69import org.eobjects.metamodel.util.LazyRef; 
    7170import org.eobjects.metamodel.util.Predicate; 
    7271import org.slf4j.Logger; 
     
    7776        private final static Logger logger = LoggerFactory.getLogger(RowProcessingPublisher.class); 
    7877 
     78        private final AnalysisJob _analysisJob; 
     79        private final Table _table; 
    7980        private final Set<Column> _physicalColumns = new HashSet<Column>(); 
    8081        private final List<RowProcessingConsumer> _consumers = new ArrayList<RowProcessingConsumer>(); 
    81         private final AnalysisJob _job; 
    82         private final Table _table; 
    8382        private final TaskRunner _taskRunner; 
    8483        private final AnalysisListener _analysisListener; 
    8584        private final LifeCycleHelper _lifeCycleHelper; 
    86  
    87         public RowProcessingPublisher(AnalysisJob job, Table table, TaskRunner taskRunner, AnalysisListener analysisListener, 
    88                         LifeCycleHelper lifeCycleHelper) { 
    89                 if (job == null) { 
    90                         throw new IllegalArgumentException("AnalysisJob cannot be null"); 
    91                 } 
     85        private final LazyRef<RowProcessingQueryOptimizer> _queryOptimizerRef; 
     86 
     87        public RowProcessingPublisher(AnalysisJob analysisJob, Table table, TaskRunner taskRunner, 
     88                        AnalysisListener analysisListener, LifeCycleHelper lifeCycleHelper) { 
    9289                if (table == null) { 
    9390                        throw new IllegalArgumentException("Table cannot be null"); 
     
    9996                        throw new IllegalArgumentException("AnalysisListener cannot be null"); 
    10097                } 
    101                 _job = job; 
     98                _analysisJob = analysisJob; 
    10299                _table = table; 
    103100                _taskRunner = taskRunner; 
    104101                _analysisListener = analysisListener; 
    105102                _lifeCycleHelper = lifeCycleHelper; 
     103 
     104                _queryOptimizerRef = new LazyRef<RowProcessingQueryOptimizer>() { 
     105                        @Override 
     106                        protected RowProcessingQueryOptimizer fetch() { 
     107                                final Datastore datastore = _analysisJob.getDatastore(); 
     108                                final DatastoreConnection con = datastore.openConnection(); 
     109                                try { 
     110                                        final DataContext dataContext = con.getDataContext(); 
     111 
     112                                        final Column[] columnArray = _physicalColumns.toArray(new Column[_physicalColumns.size()]); 
     113                                        final Query baseQuery = dataContext.query().from(_table).select(columnArray).toQuery(); 
     114 
     115                                        logger.debug("Base query for row processing: {}", baseQuery); 
     116 
     117                                        final RowProcessingConsumerSorter sorter = new RowProcessingConsumerSorter(_consumers); 
     118                                        final List<RowProcessingConsumer> sortedConsumers = sorter.createProcessOrderedConsumerList(); 
     119                                        if (logger.isDebugEnabled()) { 
     120                                                logger.debug("Row processing order ({} consumers):", sortedConsumers.size()); 
     121                                                int i = 1; 
     122                                                for (RowProcessingConsumer rowProcessingConsumer : sortedConsumers) { 
     123                                                        logger.debug(" {}) {}", i, rowProcessingConsumer); 
     124                                                        i++; 
     125                                                } 
     126                                        } 
     127 
     128                                        final RowProcessingQueryOptimizer optimizer = new RowProcessingQueryOptimizer(datastore, 
     129                                                        sortedConsumers, baseQuery); 
     130                                        return optimizer; 
     131                                } finally { 
     132                                        con.close(); 
     133                                } 
     134                        } 
     135                }; 
     136        } 
     137 
     138        public void initialize() { 
     139                // can safely load query optimizer in separate thread here 
     140                _queryOptimizerRef.requestLoad(); 
    106141        } 
    107142 
     
    116151        } 
    117152 
    118         public void run() { 
     153        private RowProcessingQueryOptimizer getQueryOptimizer() { 
     154                return _queryOptimizerRef.get(); 
     155        } 
     156 
     157        public Query getQuery() { 
     158                return getQueryOptimizer().getOptimizedQuery(); 
     159        } 
     160 
     161        public void run(RowProcessingMetrics rowProcessingMetrics) { 
    119162                for (RowProcessingConsumer rowProcessingConsumer : _consumers) { 
    120163                        if (rowProcessingConsumer instanceof AnalyzerConsumer) { 
    121                                 AnalyzerJob analyzerJob = ((AnalyzerConsumer) rowProcessingConsumer).getComponentJob(); 
    122                                 _analysisListener.analyzerBegin(_job, analyzerJob); 
    123                         } 
    124                 } 
    125  
    126                 final Datastore datastore = _job.getDatastore(); 
     164                                final AnalyzerConsumer analyzerConsumer = (AnalyzerConsumer) rowProcessingConsumer; 
     165                                final AnalyzerJob analyzerJob = analyzerConsumer.getComponentJob(); 
     166                                final AnalyzerMetrics metrics = rowProcessingMetrics.getAnalysisJobMetrics().getAnalyzerMetrics(analyzerJob); 
     167                                _analysisListener.analyzerBegin(_analysisJob, analyzerJob, metrics); 
     168                        } 
     169                } 
     170                final RowProcessingQueryOptimizer queryOptimizer = getQueryOptimizer(); 
     171                final Query finalQuery = queryOptimizer.getOptimizedQuery(); 
     172                final List<RowProcessingConsumer> consumers = queryOptimizer.getOptimizedConsumers(); 
     173                final Collection<? extends Outcome> availableOutcomes = queryOptimizer.getOptimizedAvailableOutcomes(); 
     174 
     175                _analysisListener.rowProcessingBegin(_analysisJob, rowProcessingMetrics); 
     176 
     177                // TODO: Needs to delegate errors downstream 
     178                final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(_analysisJob, _analysisListener, 
     179                                _taskRunner); 
     180                final AtomicInteger rowNumber = new AtomicInteger(0); 
     181 
     182                final Datastore datastore = _analysisJob.getDatastore(); 
    127183                final DatastoreConnection con = datastore.openConnection(); 
    128                 final DataContext dataContext = con.getDataContext(); 
    129  
    130                 final Query finalQuery; 
    131                 final List<RowProcessingConsumer> finalConsumers; 
    132                 final Collection<? extends Outcome> availableOutcomes; 
    133                 { 
    134                         final Column[] columnArray = _physicalColumns.toArray(new Column[_physicalColumns.size()]); 
    135                         final Query baseQuery = dataContext.query().from(_table).select(columnArray).toQuery(); 
    136  
    137                         logger.debug("Base query for row processing: {}", baseQuery); 
    138  
    139                         final RowProcessingConsumerSorter sorter = new RowProcessingConsumerSorter(_consumers); 
    140                         final List<RowProcessingConsumer> sortedConsumers = sorter.createProcessOrderedConsumerList(); 
    141                         if (logger.isDebugEnabled()) { 
    142                                 logger.debug("Row processing order ({} consumers):", sortedConsumers.size()); 
    143                                 int i = 1; 
    144                                 for (RowProcessingConsumer rowProcessingConsumer : sortedConsumers) { 
    145                                         logger.debug(" {}) {}", i, rowProcessingConsumer); 
    146                                         i++; 
     184 
     185                try { 
     186                        final DataContext dataContext = con.getDataContext(); 
     187                        final DataSet dataSet = dataContext.executeQuery(finalQuery); 
     188 
     189                        // represents the distinct count of rows as well as the number of 
     190                        // tasks 
     191                        // to execute 
     192                        int numTasks = 0; 
     193 
     194                        try { 
     195 
     196                                while (dataSet.next()) { 
     197                                        if (taskListener.isErrornous()) { 
     198                                                break; 
     199                                        } 
     200                                        Row metaModelRow = dataSet.getRow(); 
     201                                        ConsumeRowTask task = new ConsumeRowTask(consumers, rowProcessingMetrics, metaModelRow, rowNumber, 
     202                                                        _analysisListener, availableOutcomes); 
     203                                        _taskRunner.run(task, taskListener); 
     204                                        numTasks++; 
    147205                                } 
    148                         } 
    149  
    150                         final RowProcessingQueryOptimizer optimizer = new RowProcessingQueryOptimizer(datastore, sortedConsumers, 
    151                                         baseQuery); 
    152                         if (optimizer.isOptimizable()) { 
    153                                 finalQuery = optimizer.getOptimizedQuery(); 
    154                                 finalConsumers = optimizer.getOptimizedConsumers(); 
    155                                 availableOutcomes = optimizer.getOptimizedAvailableOutcomes(); 
    156                                 logger.info("Base query was optimizable to: {}, (maxrows={})", finalQuery, finalQuery.getMaxRows()); 
    157                         } else { 
    158                                 finalQuery = baseQuery; 
    159                                 finalConsumers = sortedConsumers; 
    160                                 availableOutcomes = Collections.emptyList(); 
    161                         } 
    162                 } 
    163  
    164                 int expectedRows = -1; 
    165                 { 
    166                         final Query countQuery = finalQuery.clone(); 
    167                         countQuery.setMaxRows(null); 
    168                         countQuery.getSelectClause().removeItems(); 
    169                         countQuery.selectCount(); 
    170                         countQuery.getSelectClause().getItem(0).setFunctionApproximationAllowed(true); 
    171                         final DataSet countDataSet = dataContext.executeQuery(countQuery); 
    172                         if (countDataSet.next()) { 
    173                                 Number count = ConvertToNumberTransformer.transformValue(countDataSet.getRow().getValue(0)); 
    174                                 if (count != null) { 
    175                                         expectedRows = count.intValue(); 
    176                                 } 
    177                         } 
    178                         Integer maxRows = finalQuery.getMaxRows(); 
    179                         if (maxRows != null) { 
    180                                 expectedRows = Math.min(expectedRows, maxRows.intValue()); 
    181                         } 
    182                 } 
    183  
    184                 _analysisListener.rowProcessingBegin(_job, _table, expectedRows); 
    185  
    186                 // TODO: Needs to delegate errors downstream 
    187                 final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(_job, _analysisListener, _taskRunner); 
    188                 final AtomicInteger rowNumber = new AtomicInteger(0); 
    189                 final DataSet dataSet = dataContext.executeQuery(finalQuery); 
    190  
    191                 // represents the distinct count of rows as well as the number of tasks 
    192                 // to execute 
    193                 int numTasks = 0; 
    194  
    195                 while (dataSet.next()) { 
    196                         if (taskListener.isErrornous()) { 
    197                                 break; 
    198                         } 
    199                         Row metaModelRow = dataSet.getRow(); 
    200                         ConsumeRowTask task = new ConsumeRowTask(finalConsumers, _table, metaModelRow, rowNumber, _job, 
    201                                         _analysisListener, availableOutcomes); 
    202                         _taskRunner.run(task, taskListener); 
    203                         numTasks++; 
    204                 } 
    205  
    206                 taskListener.awaitTasks(numTasks); 
    207  
    208                 dataSet.close(); 
    209                 con.close(); 
     206 
     207                        } finally { 
     208                                dataSet.close(); 
     209                        } 
     210                        taskListener.awaitTasks(numTasks); 
     211 
     212                } finally { 
     213                        con.close(); 
     214                } 
    210215 
    211216                if (!taskListener.isErrornous()) { 
    212                         _analysisListener.rowProcessingSuccess(_job, _table); 
     217                        _analysisListener.rowProcessingSuccess(_analysisJob, rowProcessingMetrics); 
    213218                } 
    214219        } 
    215220 
    216221        public void addRowProcessingAnalyzerBean(Analyzer<?> analyzer, AnalyzerJob analyzerJob, InputColumn<?>[] inputColumns) { 
    217                 addConsumer(new AnalyzerConsumer(_job, analyzer, analyzerJob, inputColumns, _analysisListener)); 
     222                addConsumer(new AnalyzerConsumer(_analysisJob, analyzer, analyzerJob, inputColumns, _analysisListener)); 
    218223        } 
    219224 
    220225        public void addTransformerBean(Transformer<?> transformer, TransformerJob transformerJob, InputColumn<?>[] inputColumns) { 
    221                 addConsumer(new TransformerConsumer(_job, transformer, transformerJob, inputColumns, _analysisListener)); 
     226                addConsumer(new TransformerConsumer(_analysisJob, transformer, transformerJob, inputColumns, _analysisListener)); 
    222227        } 
    223228 
    224229        public void addFilterBean(Filter<?> filter, FilterJob filterJob, InputColumn<?>[] inputColumns) { 
    225                 addConsumer(new FilterConsumer(_job, filter, filterJob, inputColumns, _analysisListener)); 
     230                addConsumer(new FilterConsumer(_analysisJob, filter, filterJob, inputColumns, _analysisListener)); 
    226231        } 
    227232 
     
    250255 
    251256        public List<TaskRunnable> createInitialTasks(TaskRunner taskRunner, Queue<JobAndResult> resultQueue, 
    252                         TaskListener rowProcessorPublishersTaskListener, Datastore datastore) { 
     257                        TaskListener rowProcessorPublishersTaskListener, Datastore datastore, AnalysisJobMetrics analysisJobMetrics) { 
    253258 
    254259                final List<RowProcessingConsumer> configurableConsumers = CollectionUtils.filter(_consumers, 
     
    278283                final TaskListener runCompletionListener = new ForkTaskListener("run row processing", taskRunner, closeTasks); 
    279284 
    280                 final RunRowProcessingPublisherTask runTask = new RunRowProcessingPublisherTask(this); 
     285                final RowProcessingMetrics rowProcessingMetrics = analysisJobMetrics.getRowProcessingMetrics(_table); 
     286                final RunRowProcessingPublisherTask runTask = new RunRowProcessingPublisherTask(this, rowProcessingMetrics); 
    281287 
    282288                final TaskListener referenceDataInitFinishedListener = new ForkTaskListener("Initialize row consumers", taskRunner, 
     
    300306                        AnalyzerConsumer analyzerConsumer = (AnalyzerConsumer) consumer; 
    301307                        Analyzer<?> analyzer = analyzerConsumer.getComponent(); 
    302                         return new CollectResultsTask(analyzer, _job, consumer.getComponentJob(), resultQueue, _analysisListener); 
     308                        return new CollectResultsTask(analyzer, _analysisJob, consumer.getComponentJob(), resultQueue, _analysisListener); 
    303309                } else { 
    304310                        throw new IllegalStateException("Unknown consumer type: " + consumer); 
     
    321327                return "RowProcessingPublisher[table=" + _table.getQualifiedLabel() + ", consumers=" + _consumers.size() + "]"; 
    322328        } 
     329 
     330        public AnalyzerJob[] getAnalyzerJobs() { 
     331                List<AnalyzerJob> analyzerJobs = new ArrayList<AnalyzerJob>(); 
     332                for (RowProcessingConsumer consumer : _consumers) { 
     333                        if (consumer instanceof AnalyzerConsumer) { 
     334                                AnalyzerJob analyzerJob = ((AnalyzerConsumer) consumer).getComponentJob(); 
     335                                analyzerJobs.add(analyzerJob); 
     336                        } 
     337                } 
     338                return analyzerJobs.toArray(new AnalyzerJob[analyzerJobs.size()]); 
     339        } 
    323340} 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/ConsumeRowTask.java

    r2706 r3040  
    2727import org.eobjects.analyzer.data.InputRow; 
    2828import org.eobjects.analyzer.data.MetaModelInputRow; 
    29 import org.eobjects.analyzer.job.AnalysisJob; 
    3029import org.eobjects.analyzer.job.Outcome; 
    3130import org.eobjects.analyzer.job.runner.AnalysisListener; 
     
    3332import org.eobjects.analyzer.job.runner.OutcomeSinkImpl; 
    3433import org.eobjects.analyzer.job.runner.RowProcessingConsumer; 
     34import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 
    3535import org.eobjects.metamodel.data.Row; 
    36 import org.eobjects.metamodel.schema.Table; 
    3736 
     37/** 
     38 * A {@link Task} that dispatches ("consumes") a record to all relevant 
     39 * {@link RowProcessingConsumer}s (eg. analyzerbeans components). 
     40 *  
     41 * @author Kasper SÞrensen 
     42 */ 
    3843public final class ConsumeRowTask implements Task { 
    3944 
    4045        private final Iterable<RowProcessingConsumer> _consumers; 
    41         private final Table _table; 
     46        private final RowProcessingMetrics _rowProcessingMetrics; 
    4247        private final Row _row; 
    4348        private final AnalysisListener _analysisListener; 
    44         private final AnalysisJob _job; 
    4549        private final AtomicInteger _rowCounter; 
    4650        private final Collection<? extends Outcome> _initialOutcomes; 
     
    4953         *  
    5054         * @param consumers 
    51          * @param table 
     55         * @param rowProcessingMetrics 
    5256         * @param row 
    5357         * @param rowCounter 
    54          * @param job 
    5558         * @param analysisListener 
    5659         * @param initialOutcomes 
     
    5861         *            will contain query-optimized outcomes) 
    5962         */ 
    60         public ConsumeRowTask(Iterable<RowProcessingConsumer> consumers, Table table, Row row, AtomicInteger rowCounter, 
    61                         AnalysisJob job, AnalysisListener analysisListener, Collection<? extends Outcome> initialOutcomes) { 
     63        public ConsumeRowTask(Iterable<RowProcessingConsumer> consumers, RowProcessingMetrics rowProcessingMetrics, Row row, 
     64                        AtomicInteger rowCounter, AnalysisListener analysisListener, Collection<? extends Outcome> initialOutcomes) { 
    6265                _consumers = consumers; 
    63                 _table = table; 
     66                _rowProcessingMetrics = rowProcessingMetrics; 
    6467                _row = row; 
    6568                _rowCounter = rowCounter; 
    66                 _job = job; 
    6769                _analysisListener = analysisListener; 
    6870                _initialOutcomes = initialOutcomes; 
     
    8688                        } 
    8789                } 
    88                 _analysisListener.rowProcessingProgress(_job, _table, rowNumber); 
     90                _analysisListener.rowProcessingProgress(_rowProcessingMetrics.getAnalysisJobMetrics().getAnalysisJob(), 
     91                                _rowProcessingMetrics, rowNumber); 
    8992        } 
    9093 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunExplorerTask.java

    r3014 r3040  
    2323import org.eobjects.analyzer.connection.Datastore; 
    2424import org.eobjects.analyzer.connection.DatastoreConnection; 
    25 import org.eobjects.analyzer.job.AnalysisJob; 
    26 import org.eobjects.analyzer.job.ExplorerJob; 
    2725import org.eobjects.analyzer.job.runner.AnalysisListener; 
     26import org.eobjects.analyzer.job.runner.ExplorerMetrics; 
    2827import org.eobjects.metamodel.DataContext; 
    2928import org.slf4j.Logger; 
     
    3534 
    3635        private final Explorer<?> _explorer; 
    37         private final AnalysisJob _job; 
    38         private final ExplorerJob _explorerJob; 
    3936        private final Datastore _datastore; 
    4037        private final AnalysisListener _analysisListener; 
     38        private final ExplorerMetrics _explorerMetrics; 
    4139 
    42         public RunExplorerTask(Explorer<?> explorer, AnalysisJob job, ExplorerJob explorerJob, Datastore datastore, 
     40        public RunExplorerTask(Explorer<?> explorer, ExplorerMetrics explorerMetrics, Datastore datastore, 
    4341                        AnalysisListener analysisListener) { 
    4442                _explorer = explorer; 
    45                 _job = job; 
    46                 _explorerJob = explorerJob; 
     43                _explorerMetrics = explorerMetrics; 
    4744                _datastore = datastore; 
    4845                _analysisListener = analysisListener; 
     
    5451 
    5552                if (_analysisListener != null) { 
    56                         _analysisListener.explorerBegin(_job, _explorerJob); 
     53                        _analysisListener.explorerBegin(_explorerMetrics.getAnalysisJobMetrics().getAnalysisJob(), 
     54                                        _explorerMetrics.getExplorerJob(), _explorerMetrics); 
    5755                } 
    5856                DatastoreConnection con = _datastore.openConnection(); 
  • AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunRowProcessingPublisherTask.java

    r1260 r3040  
    2020package org.eobjects.analyzer.job.tasks; 
    2121 
     22import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 
    2223import org.eobjects.analyzer.job.runner.RowProcessingPublisher; 
    2324import org.slf4j.Logger; 
     
    2829        private final Logger logger = LoggerFactory.getLogger(getClass()); 
    2930 
    30         private final RowProcessingPublisher _rowProcessingPublisher; 
     31        private final RowProcessingPublisher _publisher; 
     32        private final RowProcessingMetrics _metrics; 
    3133 
    32         public RunRowProcessingPublisherTask(RowProcessingPublisher rowProcessingPublisher) { 
    33                 _rowProcessingPublisher = rowProcessingPublisher; 
     34        public RunRowProcessingPublisherTask(RowProcessingPublisher publisher, RowProcessingMetrics metrics) { 
     35                _publisher = publisher; 
     36                _metrics = metrics; 
    3437        } 
    3538 
     
    3841                logger.debug("execute()"); 
    3942 
    40                 _rowProcessingPublisher.run(); 
     43                _publisher.run(_metrics); 
    4144        } 
    4245 
  • AnalyzerBeans/trunk/core/src/test/java/org/eobjects/analyzer/test/full/scenarios/PatternFinderAndStringAnalyzerDrillToDetailTest.java

    r2749 r3040  
    6666                AnalysisJobBuilder ajb = new AnalysisJobBuilder(configuration); 
    6767                ajb.setDatastoreConnection(con); 
    68                  
     68 
    6969                Table table = dc.getDefaultSchema().getTableByName("EMPLOYEES"); 
    7070                assertNotNull(table); 
     
    8282                                EmailStandardizerTransformer.class).addInputColumn(emailInputColumn); 
    8383 
    84                 AnalyzerJobBuilder<PatternFinderAnalyzer> pf = ajb 
    85                                 .addAnalyzer(PatternFinderAnalyzer.class); 
     84                AnalyzerJobBuilder<PatternFinderAnalyzer> pf = ajb.addAnalyzer(PatternFinderAnalyzer.class); 
    8685                InputColumn<?> jobtitleInputColumn = ajb.getSourceColumnByName("JOBTITLE"); 
    8786                pf.addInputColumn(jobtitleInputColumn); 
  • DataCleaner/trunk/core/src/main/java/org/eobjects/datacleaner/util/AnalysisRunnerSwingWorker.java

    r3034 r3040  
    3434import org.eobjects.analyzer.job.FilterJob; 
    3535import org.eobjects.analyzer.job.TransformerJob; 
     36import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 
    3637import org.eobjects.analyzer.job.runner.AnalysisListener; 
    3738import org.eobjects.analyzer.job.runner.AnalysisResultFuture; 
    3839import org.eobjects.analyzer.job.runner.AnalysisRunner; 
    3940import org.eobjects.analyzer.job.runner.AnalysisRunnerImpl; 
     41import org.eobjects.analyzer.job.runner.AnalyzerMetrics; 
     42import org.eobjects.analyzer.job.runner.ExplorerMetrics; 
     43import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 
    4044import org.eobjects.analyzer.result.AnalyzerResult; 
    4145import org.eobjects.analyzer.util.SourceColumnFinder; 
     
    7882 
    7983        @Override 
    80         public void jobBegin(AnalysisJob job) { 
     84        public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 
    8185                String now = new DateTime().toString(DateTimeFormat.fullTime()); 
    8286                _progressInformationPanel.addUserLog("Job begin (" + now + ")"); 
     
    8488 
    8589        @Override 
    86         public void jobSuccess(AnalysisJob job) { 
     90        public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 
    8791                String now = new DateTime().toString(DateTimeFormat.fullTime()); 
    8892                _progressInformationPanel.addUserLog("Job success (" + now + ")"); 
     
    9195 
    9296        @Override 
    93         public void rowProcessingBegin(final AnalysisJob job, final Table table, final int expectedRows) { 
     97        public void rowProcessingBegin(final AnalysisJob job, final RowProcessingMetrics metrics) { 
     98                final int expectedRows = metrics.getExpectedRows(); 
     99                final Table table = metrics.getTable(); 
    94100                if (expectedRows == -1) { 
    95101                        _progressInformationPanel.addUserLog("Starting row processing for " + table.getQualifiedLabel()); 
     
    102108 
    103109        @Override 
    104         public void rowProcessingProgress(AnalysisJob job, final Table table, final int currentRow) { 
    105                 _progressInformationPanel.updateProgress(table, currentRow); 
     110        public void rowProcessingProgress(AnalysisJob job, final RowProcessingMetrics metrics, final int currentRow) { 
     111                _progressInformationPanel.updateProgress(metrics.getTable(), currentRow); 
    106112        } 
    107113 
    108114        @Override 
    109         public void rowProcessingSuccess(AnalysisJob job, final Table table) { 
     115        public void rowProcessingSuccess(AnalysisJob job, final RowProcessingMetrics metrics) { 
    110116                String now = new DateTime().toString(DateTimeFormat.fullTime()); 
    111                 _progressInformationPanel.addUserLog("Row processing for " + table.getQualifiedLabel() + " finished (" + now 
    112                                 + "). Generating results ..."); 
     117                _progressInformationPanel.addUserLog("Row processing for " + metrics.getTable().getQualifiedLabel() + " finished (" 
     118                                + now + "). Generating results ..."); 
    113119        } 
    114120 
    115121        @Override 
    116         public void analyzerBegin(AnalysisJob job, final AnalyzerJob analyzerJob) { 
     122        public void analyzerBegin(AnalysisJob job, final AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 
    117123                _progressInformationPanel.addUserLog("Starting analyzer '" + LabelUtils.getLabel(analyzerJob) + "'"); 
    118124        } 
     
    174180 
    175181        @Override 
    176         public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob) { 
     182        public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 
    177183                _progressInformationPanel.addUserLog("Starting explorer '" + LabelUtils.getLabel(explorerJob) + "'"); 
    178184        } 
Note: See TracChangeset for help on using the changeset viewer.