Changeset 3040
- Timestamp:
- 01/16/12 14:54:39 (4 months ago)
- Files:
-
- 9 added
- 15 edited
-
AnalyzerBeans/trunk/cli/src/main/java/org/eobjects/analyzer/cli/CliProgressAnalysisListener.java (modified) (4 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/ExplorerJob.java (modified) (1 diff)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/concurrent/JobCompletionTaskListener.java (modified) (4 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisJobMetrics.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisJobMetricsImpl.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisListener.java (modified) (3 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisRunnerJobDelegate.java (modified) (11 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalyzerMetrics.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalyzerMetricsImpl.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/CompositeAnalysisListener.java (modified) (2 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/DebugLoggingAnalysisListener.java (modified) (2 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/ErrorAwareAnalysisListener.java (modified) (4 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/ExplorerMetrics.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/ExplorerMetricsImpl.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/InfoLoggingAnalysisListener.java (modified) (4 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingMetrics.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingMetricsImpl.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingPublisher.java (modified) (10 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingPublishers.java (added)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/ConsumeRowTask.java (modified) (5 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunExplorerTask.java (modified) (3 diffs)
-
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunRowProcessingPublisherTask.java (modified) (3 diffs)
-
AnalyzerBeans/trunk/core/src/test/java/org/eobjects/analyzer/test/full/scenarios/PatternFinderAndStringAnalyzerDrillToDetailTest.java (modified) (2 diffs)
-
DataCleaner/trunk/core/src/main/java/org/eobjects/datacleaner/util/AnalysisRunnerSwingWorker.java (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
AnalyzerBeans/trunk/cli/src/main/java/org/eobjects/analyzer/cli/CliProgressAnalysisListener.java
r2919 r3040 30 30 import org.eobjects.analyzer.job.FilterJob; 31 31 import org.eobjects.analyzer.job.TransformerJob; 32 import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 32 33 import org.eobjects.analyzer.job.runner.AnalysisListener; 34 import org.eobjects.analyzer.job.runner.AnalyzerMetrics; 35 import org.eobjects.analyzer.job.runner.ExplorerMetrics; 36 import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 33 37 import org.eobjects.analyzer.result.AnalyzerResult; 34 38 … … 40 44 41 45 @Override 42 public void jobBegin(AnalysisJob job ) {46 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 43 47 } 44 48 45 49 @Override 46 public void jobSuccess(AnalysisJob job ) {50 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 47 51 } 48 52 49 53 @Override 50 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) { 51 System.out.println("Analyzing " + expectedRows + " rows from table: " + table.getName()); 54 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 55 Table table = metrics.getTable(); 56 System.out.println("Analyzing rows from table: " + table.getName()); 52 57 rowCounts.put(table, new AtomicInteger(0)); 53 58 } 54 59 55 60 @Override 56 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) { 61 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 62 Table table = metrics.getTable(); 57 63 AtomicInteger rowCount = rowCounts.get(table); 58 64 if (rowCount != null) { … … 68 74 69 75 @Override 70 public void rowProcessingSuccess(AnalysisJob job, Table table) {71 System.out.println("Done processing rows from table: " + table.getName());76 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 77 System.out.println("Done processing rows from table: " + metrics.getTable().getName()); 72 78 } 73 79 74 80 @Override 75 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob ) {81 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 76 82 } 77 83 … … 97 103 98 104 @Override 99 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {105 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 100 106 } 101 107 -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/ExplorerJob.java
r2631 r3040 23 23 24 24 public interface ExplorerJob extends ComponentJob { 25 25 26 26 public ExplorerBeanDescriptor<?> getDescriptor(); 27 27 -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/concurrent/JobCompletionTaskListener.java
r2648 r3040 23 23 import java.util.concurrent.TimeUnit; 24 24 25 import org.eobjects.analyzer.job. AnalysisJob;25 import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 26 26 import org.eobjects.analyzer.job.runner.AnalysisListener; 27 27 import org.eobjects.analyzer.job.tasks.Task; … … 40 40 41 41 private final CountDownLatch _countDownLatch; 42 private final AnalysisJob _job;43 42 private final AnalysisListener _analysisListener; 43 private final AnalysisJobMetrics _analysisJobMetrics; 44 44 45 public JobCompletionTaskListener(AnalysisJob job, AnalysisListener analysisListener, int callablesToWaitFor) { 46 _job = job; 45 public JobCompletionTaskListener(AnalysisJobMetrics analysisJobMetrics, AnalysisListener analysisListener, 46 int callablesToWaitFor) { 47 _analysisJobMetrics = analysisJobMetrics; 47 48 _analysisListener = analysisListener; 48 49 _countDownLatch = new CountDownLatch(callablesToWaitFor); … … 71 72 _countDownLatch.countDown(); 72 73 if (_countDownLatch.getCount() == 0) { 73 _analysisListener.jobSuccess(_ job);74 _analysisListener.jobSuccess(_analysisJobMetrics.getAnalysisJob(), _analysisJobMetrics); 74 75 } 75 76 } … … 78 79 public void onError(Task task, Throwable throwable) { 79 80 logger.debug("onError(...)"); 80 _analysisListener.errorUknown(_ job, throwable);81 _analysisListener.errorUknown(_analysisJobMetrics.getAnalysisJob(), throwable); 81 82 _countDownLatch.countDown(); 82 83 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisListener.java
r2919 r3040 27 27 import org.eobjects.analyzer.job.TransformerJob; 28 28 import org.eobjects.analyzer.result.AnalyzerResult; 29 import org.eobjects.metamodel.schema.Table;30 29 31 30 /** … … 39 38 public interface AnalysisListener { 40 39 41 public void jobBegin(AnalysisJob job );40 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics); 42 41 43 public void jobSuccess(AnalysisJob job );42 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics); 44 43 45 44 /** … … 51 50 * the count could not be determined. 52 51 */ 53 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows);52 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics); 54 53 55 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow);54 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow); 56 55 57 public void rowProcessingSuccess(AnalysisJob job, Table table);56 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics); 58 57 59 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob );58 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics); 60 59 61 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob );60 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics); 62 61 63 62 public void analyzerSuccess(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerResult result); -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/AnalysisRunnerJobDelegate.java
r3014 r3040 22 22 import java.util.ArrayList; 23 23 import java.util.Collection; 24 import java.util.HashMap;25 import java.util.HashSet;26 24 import java.util.List; 27 import java.util.Map;28 25 import java.util.Queue; 29 import java.util.Set; 30 31 import org.eobjects.analyzer.beans.api.Analyzer; 26 32 27 import org.eobjects.analyzer.beans.api.Explorer; 33 import org.eobjects.analyzer.beans.api.Filter;34 import org.eobjects.analyzer.beans.api.Transformer;35 28 import org.eobjects.analyzer.configuration.AnalyzerBeansConfiguration; 36 29 import org.eobjects.analyzer.configuration.InjectionManager; … … 46 39 import org.eobjects.analyzer.job.MergeInput; 47 40 import org.eobjects.analyzer.job.MergedOutcomeJob; 48 import org.eobjects.analyzer.job.Outcome;49 41 import org.eobjects.analyzer.job.TransformerJob; 50 42 import org.eobjects.analyzer.job.concurrent.ForkTaskListener; … … 65 57 import org.eobjects.analyzer.lifecycle.LifeCycleHelper; 66 58 import org.eobjects.analyzer.util.SourceColumnFinder; 67 import org.eobjects.metamodel.MetaModelHelper;68 import org.eobjects.metamodel.schema.Column;69 59 import org.eobjects.metamodel.schema.Table; 70 60 import org.slf4j.Logger; … … 88 78 private final AnalysisListener _analysisListener; 89 79 private final Queue<JobAndResult> _resultQueue; 90 private final JobCompletionTaskListener _jobCompletionTaskListener;91 80 private final ErrorAware _errorAware; 92 81 private final Datastore _datastore; … … 109 98 _sourceColumnFinder.addSources(_job); 110 99 111 // A task listener that will register either succesfull executions or112 // unexpected errors (which will be delegated to the errorListener)113 _jobCompletionTaskListener = new JobCompletionTaskListener(job, analysisListener, 2);114 100 _errorAware = errorAware; 115 101 … … 129 115 */ 130 116 public AnalysisResultFuture run() { 131 _analysisListener.jobBegin(_job); 132 133 validateSingleTableInput(_transformerJobs); 134 validateSingleTableInput(_filterJobs); 135 validateSingleTableInputForMergedOutcomes(_mergedOutcomeJobs); 136 137 validateSingleTableInput(_analyzerJobs); 138 139 // the injection manager is job scoped 140 final InjectionManager injectionManager = _configuration.getInjectionManagerFactory().getInjectionManager(_job); 141 142 // at this point we are done validating the job, it will run. 143 scheduleExplorers(new LifeCycleHelper(injectionManager, new ReferenceDataActivationManager())); 144 scheduleRowProcessing(new LifeCycleHelper(injectionManager, new ReferenceDataActivationManager())); 145 146 return new AnalysisResultFutureImpl(_resultQueue, _jobCompletionTaskListener, _errorAware); 117 try { 118 // the injection manager is job scoped 119 final InjectionManager injectionManager = _configuration.getInjectionManagerFactory().getInjectionManager(_job); 120 121 final LifeCycleHelper explorerLifeCycleHelper = new LifeCycleHelper(injectionManager, 122 new ReferenceDataActivationManager()); 123 final LifeCycleHelper rowProcessingLifeCycleHelper = new LifeCycleHelper(injectionManager, 124 new ReferenceDataActivationManager()); 125 126 final RowProcessingPublishers publishers = new RowProcessingPublishers(_job, _analysisListener, _taskRunner, 127 rowProcessingLifeCycleHelper, _sourceColumnFinder); 128 129 final AnalysisJobMetrics analysisJobMetrics = new AnalysisJobMetricsImpl(_job, publishers); 130 131 // A task listener that will register either succesfull executions 132 // or 133 // unexpected errors (which will be delegated to the errorListener) 134 JobCompletionTaskListener jobCompletionTaskListener = new JobCompletionTaskListener(analysisJobMetrics, 135 _analysisListener, 2); 136 137 _analysisListener.jobBegin(_job, analysisJobMetrics); 138 139 validateSingleTableInput(_transformerJobs); 140 validateSingleTableInput(_filterJobs); 141 validateSingleTableInputForMergedOutcomes(_mergedOutcomeJobs); 142 validateSingleTableInput(_analyzerJobs); 143 144 // at this point we are done validating the job, it will run. 145 scheduleExplorers(explorerLifeCycleHelper, jobCompletionTaskListener, analysisJobMetrics); 146 scheduleRowProcessing(publishers, rowProcessingLifeCycleHelper, jobCompletionTaskListener, analysisJobMetrics); 147 148 return new AnalysisResultFutureImpl(_resultQueue, jobCompletionTaskListener, _errorAware); 149 } catch (RuntimeException e) { 150 _analysisListener.errorUknown(_job, e); 151 throw e; 152 } 153 147 154 } 148 155 … … 150 157 * Starts row processing job flows. 151 158 * 159 * @param publishers 160 * @param analysisJobMetrics 161 * 152 162 * @param injectionManager 153 163 */ 154 private void scheduleRowProcessing(LifeCycleHelper lifeCycleHelper) { 155 final Map<Table, RowProcessingPublisher> rowProcessingPublishers = new HashMap<Table, RowProcessingPublisher>(); 156 for (FilterJob filterJob : _filterJobs) { 157 registerRowProcessingPublishers(rowProcessingPublishers, filterJob, _analysisListener, _taskRunner, 158 lifeCycleHelper); 159 } 160 161 for (MergedOutcomeJob mergedOutcomeJob : _mergedOutcomeJobs) { 162 registerRowProcessingPublishers(rowProcessingPublishers, mergedOutcomeJob); 163 } 164 165 for (TransformerJob transformerJob : _transformerJobs) { 166 registerRowProcessingPublishers(rowProcessingPublishers, transformerJob, _analysisListener, _taskRunner, 167 lifeCycleHelper); 168 } 169 for (AnalyzerJob analyzerJob : _analyzerJobs) { 170 registerRowProcessingPublishers(rowProcessingPublishers, analyzerJob, _analysisListener, _taskRunner, 171 lifeCycleHelper); 172 } 173 174 logger.info("Created {} row processor publishers", rowProcessingPublishers.size()); 164 private void scheduleRowProcessing(RowProcessingPublishers publishers, LifeCycleHelper lifeCycleHelper, 165 JobCompletionTaskListener jobCompletionTaskListener, AnalysisJobMetrics analysisJobMetrics) { 166 167 logger.info("Created {} row processor publishers", publishers.size()); 175 168 176 169 final List<TaskRunnable> finalTasks = new ArrayList<TaskRunnable>(2); 177 finalTasks.add(new TaskRunnable(null, _jobCompletionTaskListener));170 finalTasks.add(new TaskRunnable(null, jobCompletionTaskListener)); 178 171 finalTasks.add(new TaskRunnable(null, new CloseReferenceDataTaskListener(lifeCycleHelper))); 179 172 … … 181 174 finalTasks); 182 175 183 final TaskListener rowProcessorPublishersDoneCompletionListener = new JoinTaskListener( 184 rowProcessingPublishers.size(), finalTaskListener); 185 186 for (RowProcessingPublisher rowProcessingPublisher : rowProcessingPublishers.values()) { 187 List<TaskRunnable> initTasks = rowProcessingPublisher.createInitialTasks(_taskRunner, _resultQueue, 188 rowProcessorPublishersDoneCompletionListener, _datastore); 176 final TaskListener rowProcessorPublishersDoneCompletionListener = new JoinTaskListener(publishers.size(), 177 finalTaskListener); 178 179 final Table[] tables = publishers.getTables(); 180 for (Table table : tables) { 181 final RowProcessingPublisher rowProcessingPublisher = publishers.getRowProcessingPublisher(table); 182 final List<TaskRunnable> initTasks = rowProcessingPublisher.createInitialTasks(_taskRunner, _resultQueue, 183 rowProcessorPublishersDoneCompletionListener, _datastore, analysisJobMetrics); 189 184 logger.debug("Scheduling {} tasks for row processing publisher: {}", initTasks.size(), rowProcessingPublisher); 190 185 for (TaskRunnable taskRunnable : initTasks) { … … 199 194 * @param injectionManager 200 195 */ 201 private void scheduleExplorers(final LifeCycleHelper lifeCycleHelper) { 196 private void scheduleExplorers(final LifeCycleHelper lifeCycleHelper, 197 final JobCompletionTaskListener jobCompletionTaskListener, final AnalysisJobMetrics analysisJobMetrics) { 202 198 final int numExplorerJobs = _explorerJobs.size(); 203 199 if (numExplorerJobs == 0) { 204 _jobCompletionTaskListener.onComplete(null);200 jobCompletionTaskListener.onComplete(null); 205 201 return; 206 202 } 207 203 208 204 final List<TaskRunnable> finalTasks = new ArrayList<TaskRunnable>(); 209 finalTasks.add(new TaskRunnable(null, _jobCompletionTaskListener));205 finalTasks.add(new TaskRunnable(null, jobCompletionTaskListener)); 210 206 finalTasks.add(new TaskRunnable(null, new CloseReferenceDataTaskListener(lifeCycleHelper))); 211 207 … … 215 211 // begin explorer jobs first because they can run independently ( 216 212 for (ExplorerJob explorerJob : _explorerJobs) { 217 final DatastoreConnection dataContextProvider = _datastore.openConnection(); 218 219 ExplorerBeanDescriptor<?> descriptor = explorerJob.getDescriptor(); 220 Explorer<?> explorer = descriptor.newInstance(); 221 222 finalTasks.add(new TaskRunnable(null, new CloseResourcesTaskListener(dataContextProvider))); 213 final ExplorerMetrics metrics = analysisJobMetrics.getExplorerMetrics(explorerJob); 214 215 final DatastoreConnection connection = _datastore.openConnection(); 216 217 final ExplorerBeanDescriptor<?> descriptor = explorerJob.getDescriptor(); 218 final Explorer<?> explorer = descriptor.newInstance(); 219 220 finalTasks.add(new TaskRunnable(null, new CloseResourcesTaskListener(connection))); 223 221 finalTasks.add(new TaskRunnable(null, new CloseBeanTaskListener(lifeCycleHelper, descriptor, explorer))); 224 222 225 223 // set up scheduling for the explorers 226 Task closeTask = new CollectResultsTask(explorer, _job, explorerJob, _resultQueue, _analysisListener); 227 TaskListener runFinishedListener = new RunNextTaskTaskListener(_taskRunner, closeTask, explorersDoneTaskListener); 228 Task runTask = new RunExplorerTask(explorer, _job, explorerJob, _datastore, _analysisListener); 224 final Task closeTask = new CollectResultsTask(explorer, _job, explorerJob, _resultQueue, _analysisListener); 225 final TaskListener runFinishedListener = new RunNextTaskTaskListener(_taskRunner, closeTask, 226 explorersDoneTaskListener); 227 final Task runTask = new RunExplorerTask(explorer, metrics, _datastore, _analysisListener); 229 228 230 229 TaskListener referenceDataInitFinishedListener = new RunNextTaskTaskListener(_taskRunner, runTask, … … 297 296 } 298 297 299 private void registerRowProcessingPublishers(Map<Table, RowProcessingPublisher> rowProcessingPublishers,300 MergedOutcomeJob mergedOutcomeJob) {301 302 Collection<RowProcessingPublisher> publishers = rowProcessingPublishers.values();303 for (RowProcessingPublisher rowProcessingPublisher : publishers) {304 boolean prerequisiteOutcomesExist = true;305 MergeInput[] mergeInputs = mergedOutcomeJob.getMergeInputs();306 for (MergeInput mergeInput : mergeInputs) {307 Outcome prerequisiteOutcome = mergeInput.getOutcome();308 if (!rowProcessingPublisher.containsOutcome(prerequisiteOutcome)) {309 prerequisiteOutcomesExist = false;310 break;311 }312 }313 314 if (prerequisiteOutcomesExist) {315 rowProcessingPublisher.addMergedOutcomeJob(mergedOutcomeJob);316 }317 }318 }319 320 private void registerRowProcessingPublishers(final Map<Table, RowProcessingPublisher> rowProcessingPublishers,321 final ConfigurableBeanJob<?> beanJob, final AnalysisListener listener, final TaskRunner taskRunner,322 final LifeCycleHelper lifeCycleHelper) {323 final Set<Column> physicalColumns = new HashSet<Column>();324 325 final InputColumn<?>[] inputColumns = beanJob.getInput();326 for (InputColumn<?> inputColumn : inputColumns) {327 physicalColumns.addAll(_sourceColumnFinder.findOriginatingColumns(inputColumn));328 }329 final Outcome[] requirements = beanJob.getRequirements();330 for (Outcome requirement : requirements) {331 physicalColumns.addAll(_sourceColumnFinder.findOriginatingColumns(requirement));332 }333 334 final Column[] physicalColumnsArray = physicalColumns.toArray(new Column[physicalColumns.size()]);335 final Table[] tables;336 if (physicalColumns.isEmpty()) {337 // if not dependent on any specific tables, make component available338 // for all tables339 Set<Table> allTables = new HashSet<Table>();340 Collection<InputColumn<?>> allSourceColumns = _job.getSourceColumns();341 for (InputColumn<?> inputColumn : allSourceColumns) {342 allTables.add(inputColumn.getPhysicalColumn().getTable());343 }344 tables = allTables.toArray(new Table[allTables.size()]);345 } else {346 tables = MetaModelHelper.getTables(physicalColumnsArray);347 }348 349 for (Table table : tables) {350 RowProcessingPublisher rowPublisher = rowProcessingPublishers.get(table);351 if (rowPublisher == null) {352 rowPublisher = new RowProcessingPublisher(_job, table, taskRunner, listener, lifeCycleHelper);353 rowProcessingPublishers.put(table, rowPublisher);354 }355 356 // register the physical columns needed by this job357 Column[] tableColumns = MetaModelHelper.getTableColumns(table, physicalColumnsArray);358 rowPublisher.addPhysicalColumns(tableColumns);359 360 // find which input columns (both physical or virtual) are needed by361 // this per-table instance362 InputColumn<?>[] localInputColumns = getLocalInputColumns(table, inputColumns);363 364 if (beanJob instanceof AnalyzerJob) {365 AnalyzerJob analyzerJob = (AnalyzerJob) beanJob;366 Analyzer<?> analyzer = analyzerJob.getDescriptor().newInstance();367 rowPublisher.addRowProcessingAnalyzerBean(analyzer, analyzerJob, localInputColumns);368 } else if (beanJob instanceof TransformerJob) {369 TransformerJob transformerJob = (TransformerJob) beanJob;370 Transformer<?> transformer = transformerJob.getDescriptor().newInstance();371 rowPublisher.addTransformerBean(transformer, transformerJob, localInputColumns);372 } else if (beanJob instanceof FilterJob) {373 FilterJob filterJob = (FilterJob) beanJob;374 Filter<?> filter = filterJob.getDescriptor().newInstance();375 rowPublisher.addFilterBean(filter, filterJob, localInputColumns);376 } else {377 throw new UnsupportedOperationException("Unsupported job type: " + beanJob);378 }379 }380 }381 382 private InputColumn<?>[] getLocalInputColumns(Table table, InputColumn<?>[] inputColumns) {383 if (table == null || inputColumns == null || inputColumns.length == 0) {384 return new InputColumn<?>[0];385 }386 List<InputColumn<?>> result = new ArrayList<InputColumn<?>>();387 for (InputColumn<?> inputColumn : inputColumns) {388 Set<Column> sourcePhysicalColumns = _sourceColumnFinder.findOriginatingColumns(inputColumn);389 for (Column physicalColumn : sourcePhysicalColumns) {390 if (table.equals(physicalColumn.getTable())) {391 result.add(inputColumn);392 break;393 }394 }395 }396 return result.toArray(new InputColumn<?>[result.size()]);397 }398 298 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/CompositeAnalysisListener.java
r2919 r3040 31 31 import org.eobjects.analyzer.result.AnalyzerResult; 32 32 33 import org.eobjects.metamodel.schema.Table;34 35 33 public final class CompositeAnalysisListener implements AnalysisListener { 36 34 … … 50 48 51 49 @Override 52 public void jobBegin(AnalysisJob job ) {50 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 53 51 for (AnalysisListener delegate : _delegates) { 54 delegate.jobBegin(job );52 delegate.jobBegin(job, metrics); 55 53 } 56 54 } 57 55 58 56 @Override 59 public void jobSuccess(AnalysisJob job ) {57 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 60 58 for (AnalysisListener delegate : _delegates) { 61 delegate.jobSuccess(job );59 delegate.jobSuccess(job, metrics); 62 60 } 63 61 } 64 62 65 63 @Override 66 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) {64 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 67 65 for (AnalysisListener delegate : _delegates) { 68 delegate.rowProcessingBegin(job, table, expectedRows);66 delegate.rowProcessingBegin(job, metrics); 69 67 } 70 68 } 71 69 72 70 @Override 73 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) {71 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 74 72 for (AnalysisListener delegate : _delegates) { 75 delegate.rowProcessingProgress(job, table, currentRow);73 delegate.rowProcessingProgress(job, metrics, currentRow); 76 74 } 77 75 } 78 76 79 77 @Override 80 public void rowProcessingSuccess(AnalysisJob job, Table table) {78 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 81 79 for (AnalysisListener delegate : _delegates) { 82 delegate.rowProcessingSuccess(job, table);80 delegate.rowProcessingSuccess(job, metrics); 83 81 } 84 82 } 85 83 86 84 @Override 87 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob ) {85 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 88 86 for (AnalysisListener delegate : _delegates) { 89 delegate.analyzerBegin(job, analyzerJob );87 delegate.analyzerBegin(job, analyzerJob, metrics); 90 88 } 91 89 } 92 90 93 91 @Override 94 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {92 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 95 93 for (AnalysisListener delegate : _delegates) { 96 delegate.explorerBegin(job, explorerJob );94 delegate.explorerBegin(job, explorerJob, metrics); 97 95 } 98 96 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/DebugLoggingAnalysisListener.java
r2919 r3040 30 30 import org.slf4j.LoggerFactory; 31 31 32 import org.eobjects.metamodel.schema.Table;33 34 32 /** 35 33 * AnalysisListener used for DEBUG level logging. This listener is obviously … … 51 49 52 50 @Override 53 public void jobBegin(AnalysisJob job ) {51 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 54 52 logger.debug("jobBegin({})", job); 55 53 } 56 54 57 55 @Override 58 public void jobSuccess(AnalysisJob job ) {56 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 59 57 logger.debug("jobSuccess({})", job); 60 58 } 61 59 62 60 @Override 63 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) {64 logger.debug("rowProcessingBegin({}, {} , {})", new Object[] { job, table, expectedRows});61 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 62 logger.debug("rowProcessingBegin({}, {})", new Object[] { job, metrics.getTable() }); 65 63 } 66 64 67 65 @Override 68 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) {69 logger.debug("rowProcessingProgress({}, {}, {})", new Object[] { job, table, currentRow });66 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 67 logger.debug("rowProcessingProgress({}, {}, {})", new Object[] { job, metrics.getTable(), currentRow }); 70 68 } 71 69 72 70 @Override 73 public void rowProcessingSuccess(AnalysisJob job, Table table) {74 logger.debug("rowProcessingSuccess({}, {})", new Object[] { job, table});71 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 72 logger.debug("rowProcessingSuccess({}, {})", new Object[] { job, metrics.getTable() }); 75 73 } 76 74 77 75 @Override 78 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob ) {76 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 79 77 logger.debug("analyzerBegin({}, {})", new Object[] { job, analyzerJob }); 80 78 } 81 79 82 80 @Override 83 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {81 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 84 82 logger.debug("explorerBegin({}, {})", new Object[] { job, explorerJob }); 85 83 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/ErrorAwareAnalysisListener.java
r2919 r3040 35 35 import org.slf4j.LoggerFactory; 36 36 37 import org.eobjects.metamodel.schema.Table;38 39 37 /** 40 38 * AnalysisListener that will register errors … … 50 48 51 49 @Override 52 public void jobBegin(AnalysisJob job ) {50 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 53 51 } 54 52 55 53 @Override 56 public void jobSuccess(AnalysisJob job ) {54 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 57 55 } 58 56 … … 113 111 114 112 @Override 115 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {113 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 116 114 } 117 115 … … 121 119 122 120 @Override 123 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) {121 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 124 122 } 125 123 126 124 @Override 127 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) {125 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 128 126 } 129 127 130 128 @Override 131 public void rowProcessingSuccess(AnalysisJob job, Table table) {129 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 132 130 } 133 131 134 132 @Override 135 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob ) {133 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 136 134 } 137 135 -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/InfoLoggingAnalysisListener.java
r2919 r3040 30 30 import org.slf4j.LoggerFactory; 31 31 32 import org.eobjects.metamodel.schema.Table;33 34 32 /** 35 33 * AnalysisListener used for INFO level logging. This listener will log … … 51 49 52 50 @Override 53 public void jobBegin(AnalysisJob job ) {51 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 54 52 // do nothing 55 53 } 56 54 57 55 @Override 58 public void jobSuccess(AnalysisJob job ) {56 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 59 57 // do nothing 60 58 } 61 59 62 60 @Override 63 public void rowProcessingBegin(AnalysisJob job, Table table, int expectedRows) {64 logger.info("Beginning row processing of {} rows in {}", new Object[] { expectedRows, table });61 public void rowProcessingBegin(AnalysisJob job, RowProcessingMetrics metrics) { 62 logger.info("Beginning row processing of {}", metrics.getTable()); 65 63 } 66 64 67 65 @Override 68 public void rowProcessingProgress(AnalysisJob job, Table table, int currentRow) {66 public void rowProcessingProgress(AnalysisJob job, RowProcessingMetrics metrics, int currentRow) { 69 67 if (currentRow > 0 && currentRow % 1000 == 0) { 70 logger.info("Reading row no. {} in {}", new Object[] { currentRow, table.getName() });68 logger.info("Reading row no. {} in {}", new Object[] { currentRow, metrics.getTable().getName() }); 71 69 } 72 70 } … … 78 76 79 77 @Override 80 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {78 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 81 79 // do nothing 82 80 } … … 88 86 89 87 @Override 90 public void rowProcessingSuccess(AnalysisJob job, Table table) {88 public void rowProcessingSuccess(AnalysisJob job, RowProcessingMetrics metrics) { 91 89 // do nothing 92 90 } 93 91 94 92 @Override 95 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob ) {93 public void analyzerBegin(AnalysisJob job, AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 96 94 // do nothing 97 95 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/runner/RowProcessingPublisher.java
r3014 r3040 23 23 import java.util.Arrays; 24 24 import java.util.Collection; 25 import java.util.Collections;26 25 import java.util.HashSet; 27 26 import java.util.List; … … 33 32 import org.eobjects.analyzer.beans.api.Filter; 34 33 import org.eobjects.analyzer.beans.api.Transformer; 35 import org.eobjects.analyzer.beans.convert.ConvertToNumberTransformer;36 34 import org.eobjects.analyzer.connection.Datastore; 37 35 import org.eobjects.analyzer.connection.DatastoreConnection; … … 69 67 import org.eobjects.metamodel.schema.Table; 70 68 import org.eobjects.metamodel.util.CollectionUtils; 69 import org.eobjects.metamodel.util.LazyRef; 71 70 import org.eobjects.metamodel.util.Predicate; 72 71 import org.slf4j.Logger; … … 77 76 private final static Logger logger = LoggerFactory.getLogger(RowProcessingPublisher.class); 78 77 78 private final AnalysisJob _analysisJob; 79 private final Table _table; 79 80 private final Set<Column> _physicalColumns = new HashSet<Column>(); 80 81 private final List<RowProcessingConsumer> _consumers = new ArrayList<RowProcessingConsumer>(); 81 private final AnalysisJob _job;82 private final Table _table;83 82 private final TaskRunner _taskRunner; 84 83 private final AnalysisListener _analysisListener; 85 84 private final LifeCycleHelper _lifeCycleHelper; 86 87 public RowProcessingPublisher(AnalysisJob job, Table table, TaskRunner taskRunner, AnalysisListener analysisListener, 88 LifeCycleHelper lifeCycleHelper) { 89 if (job == null) { 90 throw new IllegalArgumentException("AnalysisJob cannot be null"); 91 } 85 private final LazyRef<RowProcessingQueryOptimizer> _queryOptimizerRef; 86 87 public RowProcessingPublisher(AnalysisJob analysisJob, Table table, TaskRunner taskRunner, 88 AnalysisListener analysisListener, LifeCycleHelper lifeCycleHelper) { 92 89 if (table == null) { 93 90 throw new IllegalArgumentException("Table cannot be null"); … … 99 96 throw new IllegalArgumentException("AnalysisListener cannot be null"); 100 97 } 101 _ job = job;98 _analysisJob = analysisJob; 102 99 _table = table; 103 100 _taskRunner = taskRunner; 104 101 _analysisListener = analysisListener; 105 102 _lifeCycleHelper = lifeCycleHelper; 103 104 _queryOptimizerRef = new LazyRef<RowProcessingQueryOptimizer>() { 105 @Override 106 protected RowProcessingQueryOptimizer fetch() { 107 final Datastore datastore = _analysisJob.getDatastore(); 108 final DatastoreConnection con = datastore.openConnection(); 109 try { 110 final DataContext dataContext = con.getDataContext(); 111 112 final Column[] columnArray = _physicalColumns.toArray(new Column[_physicalColumns.size()]); 113 final Query baseQuery = dataContext.query().from(_table).select(columnArray).toQuery(); 114 115 logger.debug("Base query for row processing: {}", baseQuery); 116 117 final RowProcessingConsumerSorter sorter = new RowProcessingConsumerSorter(_consumers); 118 final List<RowProcessingConsumer> sortedConsumers = sorter.createProcessOrderedConsumerList(); 119 if (logger.isDebugEnabled()) { 120 logger.debug("Row processing order ({} consumers):", sortedConsumers.size()); 121 int i = 1; 122 for (RowProcessingConsumer rowProcessingConsumer : sortedConsumers) { 123 logger.debug(" {}) {}", i, rowProcessingConsumer); 124 i++; 125 } 126 } 127 128 final RowProcessingQueryOptimizer optimizer = new RowProcessingQueryOptimizer(datastore, 129 sortedConsumers, baseQuery); 130 return optimizer; 131 } finally { 132 con.close(); 133 } 134 } 135 }; 136 } 137 138 public void initialize() { 139 // can safely load query optimizer in separate thread here 140 _queryOptimizerRef.requestLoad(); 106 141 } 107 142 … … 116 151 } 117 152 118 public void run() { 153 private RowProcessingQueryOptimizer getQueryOptimizer() { 154 return _queryOptimizerRef.get(); 155 } 156 157 public Query getQuery() { 158 return getQueryOptimizer().getOptimizedQuery(); 159 } 160 161 public void run(RowProcessingMetrics rowProcessingMetrics) { 119 162 for (RowProcessingConsumer rowProcessingConsumer : _consumers) { 120 163 if (rowProcessingConsumer instanceof AnalyzerConsumer) { 121 AnalyzerJob analyzerJob = ((AnalyzerConsumer) rowProcessingConsumer).getComponentJob(); 122 _analysisListener.analyzerBegin(_job, analyzerJob); 123 } 124 } 125 126 final Datastore datastore = _job.getDatastore(); 164 final AnalyzerConsumer analyzerConsumer = (AnalyzerConsumer) rowProcessingConsumer; 165 final AnalyzerJob analyzerJob = analyzerConsumer.getComponentJob(); 166 final AnalyzerMetrics metrics = rowProcessingMetrics.getAnalysisJobMetrics().getAnalyzerMetrics(analyzerJob); 167 _analysisListener.analyzerBegin(_analysisJob, analyzerJob, metrics); 168 } 169 } 170 final RowProcessingQueryOptimizer queryOptimizer = getQueryOptimizer(); 171 final Query finalQuery = queryOptimizer.getOptimizedQuery(); 172 final List<RowProcessingConsumer> consumers = queryOptimizer.getOptimizedConsumers(); 173 final Collection<? extends Outcome> availableOutcomes = queryOptimizer.getOptimizedAvailableOutcomes(); 174 175 _analysisListener.rowProcessingBegin(_analysisJob, rowProcessingMetrics); 176 177 // TODO: Needs to delegate errors downstream 178 final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(_analysisJob, _analysisListener, 179 _taskRunner); 180 final AtomicInteger rowNumber = new AtomicInteger(0); 181 182 final Datastore datastore = _analysisJob.getDatastore(); 127 183 final DatastoreConnection con = datastore.openConnection(); 128 final DataContext dataContext = con.getDataContext(); 129 130 final Query finalQuery; 131 final List<RowProcessingConsumer> finalConsumers; 132 final Collection<? extends Outcome> availableOutcomes; 133 { 134 final Column[] columnArray = _physicalColumns.toArray(new Column[_physicalColumns.size()]); 135 final Query baseQuery = dataContext.query().from(_table).select(columnArray).toQuery(); 136 137 logger.debug("Base query for row processing: {}", baseQuery); 138 139 final RowProcessingConsumerSorter sorter = new RowProcessingConsumerSorter(_consumers); 140 final List<RowProcessingConsumer> sortedConsumers = sorter.createProcessOrderedConsumerList(); 141 if (logger.isDebugEnabled()) { 142 logger.debug("Row processing order ({} consumers):", sortedConsumers.size()); 143 int i = 1; 144 for (RowProcessingConsumer rowProcessingConsumer : sortedConsumers) { 145 logger.debug(" {}) {}", i, rowProcessingConsumer); 146 i++; 184 185 try { 186 final DataContext dataContext = con.getDataContext(); 187 final DataSet dataSet = dataContext.executeQuery(finalQuery); 188 189 // represents the distinct count of rows as well as the number of 190 // tasks 191 // to execute 192 int numTasks = 0; 193 194 try { 195 196 while (dataSet.next()) { 197 if (taskListener.isErrornous()) { 198 break; 199 } 200 Row metaModelRow = dataSet.getRow(); 201 ConsumeRowTask task = new ConsumeRowTask(consumers, rowProcessingMetrics, metaModelRow, rowNumber, 202 _analysisListener, availableOutcomes); 203 _taskRunner.run(task, taskListener); 204 numTasks++; 147 205 } 148 } 149 150 final RowProcessingQueryOptimizer optimizer = new RowProcessingQueryOptimizer(datastore, sortedConsumers, 151 baseQuery); 152 if (optimizer.isOptimizable()) { 153 finalQuery = optimizer.getOptimizedQuery(); 154 finalConsumers = optimizer.getOptimizedConsumers(); 155 availableOutcomes = optimizer.getOptimizedAvailableOutcomes(); 156 logger.info("Base query was optimizable to: {}, (maxrows={})", finalQuery, finalQuery.getMaxRows()); 157 } else { 158 finalQuery = baseQuery; 159 finalConsumers = sortedConsumers; 160 availableOutcomes = Collections.emptyList(); 161 } 162 } 163 164 int expectedRows = -1; 165 { 166 final Query countQuery = finalQuery.clone(); 167 countQuery.setMaxRows(null); 168 countQuery.getSelectClause().removeItems(); 169 countQuery.selectCount(); 170 countQuery.getSelectClause().getItem(0).setFunctionApproximationAllowed(true); 171 final DataSet countDataSet = dataContext.executeQuery(countQuery); 172 if (countDataSet.next()) { 173 Number count = ConvertToNumberTransformer.transformValue(countDataSet.getRow().getValue(0)); 174 if (count != null) { 175 expectedRows = count.intValue(); 176 } 177 } 178 Integer maxRows = finalQuery.getMaxRows(); 179 if (maxRows != null) { 180 expectedRows = Math.min(expectedRows, maxRows.intValue()); 181 } 182 } 183 184 _analysisListener.rowProcessingBegin(_job, _table, expectedRows); 185 186 // TODO: Needs to delegate errors downstream 187 final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(_job, _analysisListener, _taskRunner); 188 final AtomicInteger rowNumber = new AtomicInteger(0); 189 final DataSet dataSet = dataContext.executeQuery(finalQuery); 190 191 // represents the distinct count of rows as well as the number of tasks 192 // to execute 193 int numTasks = 0; 194 195 while (dataSet.next()) { 196 if (taskListener.isErrornous()) { 197 break; 198 } 199 Row metaModelRow = dataSet.getRow(); 200 ConsumeRowTask task = new ConsumeRowTask(finalConsumers, _table, metaModelRow, rowNumber, _job, 201 _analysisListener, availableOutcomes); 202 _taskRunner.run(task, taskListener); 203 numTasks++; 204 } 205 206 taskListener.awaitTasks(numTasks); 207 208 dataSet.close(); 209 con.close(); 206 207 } finally { 208 dataSet.close(); 209 } 210 taskListener.awaitTasks(numTasks); 211 212 } finally { 213 con.close(); 214 } 210 215 211 216 if (!taskListener.isErrornous()) { 212 _analysisListener.rowProcessingSuccess(_ job, _table);217 _analysisListener.rowProcessingSuccess(_analysisJob, rowProcessingMetrics); 213 218 } 214 219 } 215 220 216 221 public void addRowProcessingAnalyzerBean(Analyzer<?> analyzer, AnalyzerJob analyzerJob, InputColumn<?>[] inputColumns) { 217 addConsumer(new AnalyzerConsumer(_ job, analyzer, analyzerJob, inputColumns, _analysisListener));222 addConsumer(new AnalyzerConsumer(_analysisJob, analyzer, analyzerJob, inputColumns, _analysisListener)); 218 223 } 219 224 220 225 public void addTransformerBean(Transformer<?> transformer, TransformerJob transformerJob, InputColumn<?>[] inputColumns) { 221 addConsumer(new TransformerConsumer(_ job, transformer, transformerJob, inputColumns, _analysisListener));226 addConsumer(new TransformerConsumer(_analysisJob, transformer, transformerJob, inputColumns, _analysisListener)); 222 227 } 223 228 224 229 public void addFilterBean(Filter<?> filter, FilterJob filterJob, InputColumn<?>[] inputColumns) { 225 addConsumer(new FilterConsumer(_ job, filter, filterJob, inputColumns, _analysisListener));230 addConsumer(new FilterConsumer(_analysisJob, filter, filterJob, inputColumns, _analysisListener)); 226 231 } 227 232 … … 250 255 251 256 public List<TaskRunnable> createInitialTasks(TaskRunner taskRunner, Queue<JobAndResult> resultQueue, 252 TaskListener rowProcessorPublishersTaskListener, Datastore datastore ) {257 TaskListener rowProcessorPublishersTaskListener, Datastore datastore, AnalysisJobMetrics analysisJobMetrics) { 253 258 254 259 final List<RowProcessingConsumer> configurableConsumers = CollectionUtils.filter(_consumers, … … 278 283 final TaskListener runCompletionListener = new ForkTaskListener("run row processing", taskRunner, closeTasks); 279 284 280 final RunRowProcessingPublisherTask runTask = new RunRowProcessingPublisherTask(this); 285 final RowProcessingMetrics rowProcessingMetrics = analysisJobMetrics.getRowProcessingMetrics(_table); 286 final RunRowProcessingPublisherTask runTask = new RunRowProcessingPublisherTask(this, rowProcessingMetrics); 281 287 282 288 final TaskListener referenceDataInitFinishedListener = new ForkTaskListener("Initialize row consumers", taskRunner, … … 300 306 AnalyzerConsumer analyzerConsumer = (AnalyzerConsumer) consumer; 301 307 Analyzer<?> analyzer = analyzerConsumer.getComponent(); 302 return new CollectResultsTask(analyzer, _ job, consumer.getComponentJob(), resultQueue, _analysisListener);308 return new CollectResultsTask(analyzer, _analysisJob, consumer.getComponentJob(), resultQueue, _analysisListener); 303 309 } else { 304 310 throw new IllegalStateException("Unknown consumer type: " + consumer); … … 321 327 return "RowProcessingPublisher[table=" + _table.getQualifiedLabel() + ", consumers=" + _consumers.size() + "]"; 322 328 } 329 330 public AnalyzerJob[] getAnalyzerJobs() { 331 List<AnalyzerJob> analyzerJobs = new ArrayList<AnalyzerJob>(); 332 for (RowProcessingConsumer consumer : _consumers) { 333 if (consumer instanceof AnalyzerConsumer) { 334 AnalyzerJob analyzerJob = ((AnalyzerConsumer) consumer).getComponentJob(); 335 analyzerJobs.add(analyzerJob); 336 } 337 } 338 return analyzerJobs.toArray(new AnalyzerJob[analyzerJobs.size()]); 339 } 323 340 } -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/ConsumeRowTask.java
r2706 r3040 27 27 import org.eobjects.analyzer.data.InputRow; 28 28 import org.eobjects.analyzer.data.MetaModelInputRow; 29 import org.eobjects.analyzer.job.AnalysisJob;30 29 import org.eobjects.analyzer.job.Outcome; 31 30 import org.eobjects.analyzer.job.runner.AnalysisListener; … … 33 32 import org.eobjects.analyzer.job.runner.OutcomeSinkImpl; 34 33 import org.eobjects.analyzer.job.runner.RowProcessingConsumer; 34 import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 35 35 import org.eobjects.metamodel.data.Row; 36 import org.eobjects.metamodel.schema.Table;37 36 37 /** 38 * A {@link Task} that dispatches ("consumes") a record to all relevant 39 * {@link RowProcessingConsumer}s (eg. analyzerbeans components). 40 * 41 * @author Kasper SÞrensen 42 */ 38 43 public final class ConsumeRowTask implements Task { 39 44 40 45 private final Iterable<RowProcessingConsumer> _consumers; 41 private final Table _table;46 private final RowProcessingMetrics _rowProcessingMetrics; 42 47 private final Row _row; 43 48 private final AnalysisListener _analysisListener; 44 private final AnalysisJob _job;45 49 private final AtomicInteger _rowCounter; 46 50 private final Collection<? extends Outcome> _initialOutcomes; … … 49 53 * 50 54 * @param consumers 51 * @param table55 * @param rowProcessingMetrics 52 56 * @param row 53 57 * @param rowCounter 54 * @param job55 58 * @param analysisListener 56 59 * @param initialOutcomes … … 58 61 * will contain query-optimized outcomes) 59 62 */ 60 public ConsumeRowTask(Iterable<RowProcessingConsumer> consumers, Table table, Row row, AtomicInteger rowCounter,61 A nalysisJob job, AnalysisListener analysisListener, Collection<? extends Outcome> initialOutcomes) {63 public ConsumeRowTask(Iterable<RowProcessingConsumer> consumers, RowProcessingMetrics rowProcessingMetrics, Row row, 64 AtomicInteger rowCounter, AnalysisListener analysisListener, Collection<? extends Outcome> initialOutcomes) { 62 65 _consumers = consumers; 63 _ table = table;66 _rowProcessingMetrics = rowProcessingMetrics; 64 67 _row = row; 65 68 _rowCounter = rowCounter; 66 _job = job;67 69 _analysisListener = analysisListener; 68 70 _initialOutcomes = initialOutcomes; … … 86 88 } 87 89 } 88 _analysisListener.rowProcessingProgress(_job, _table, rowNumber); 90 _analysisListener.rowProcessingProgress(_rowProcessingMetrics.getAnalysisJobMetrics().getAnalysisJob(), 91 _rowProcessingMetrics, rowNumber); 89 92 } 90 93 -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunExplorerTask.java
r3014 r3040 23 23 import org.eobjects.analyzer.connection.Datastore; 24 24 import org.eobjects.analyzer.connection.DatastoreConnection; 25 import org.eobjects.analyzer.job.AnalysisJob;26 import org.eobjects.analyzer.job.ExplorerJob;27 25 import org.eobjects.analyzer.job.runner.AnalysisListener; 26 import org.eobjects.analyzer.job.runner.ExplorerMetrics; 28 27 import org.eobjects.metamodel.DataContext; 29 28 import org.slf4j.Logger; … … 35 34 36 35 private final Explorer<?> _explorer; 37 private final AnalysisJob _job;38 private final ExplorerJob _explorerJob;39 36 private final Datastore _datastore; 40 37 private final AnalysisListener _analysisListener; 38 private final ExplorerMetrics _explorerMetrics; 41 39 42 public RunExplorerTask(Explorer<?> explorer, AnalysisJob job, ExplorerJob explorerJob, Datastore datastore,40 public RunExplorerTask(Explorer<?> explorer, ExplorerMetrics explorerMetrics, Datastore datastore, 43 41 AnalysisListener analysisListener) { 44 42 _explorer = explorer; 45 _job = job; 46 _explorerJob = explorerJob; 43 _explorerMetrics = explorerMetrics; 47 44 _datastore = datastore; 48 45 _analysisListener = analysisListener; … … 54 51 55 52 if (_analysisListener != null) { 56 _analysisListener.explorerBegin(_job, _explorerJob); 53 _analysisListener.explorerBegin(_explorerMetrics.getAnalysisJobMetrics().getAnalysisJob(), 54 _explorerMetrics.getExplorerJob(), _explorerMetrics); 57 55 } 58 56 DatastoreConnection con = _datastore.openConnection(); -
AnalyzerBeans/trunk/core/src/main/java/org/eobjects/analyzer/job/tasks/RunRowProcessingPublisherTask.java
r1260 r3040 20 20 package org.eobjects.analyzer.job.tasks; 21 21 22 import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 22 23 import org.eobjects.analyzer.job.runner.RowProcessingPublisher; 23 24 import org.slf4j.Logger; … … 28 29 private final Logger logger = LoggerFactory.getLogger(getClass()); 29 30 30 private final RowProcessingPublisher _rowProcessingPublisher; 31 private final RowProcessingPublisher _publisher; 32 private final RowProcessingMetrics _metrics; 31 33 32 public RunRowProcessingPublisherTask(RowProcessingPublisher rowProcessingPublisher) { 33 _rowProcessingPublisher = rowProcessingPublisher; 34 public RunRowProcessingPublisherTask(RowProcessingPublisher publisher, RowProcessingMetrics metrics) { 35 _publisher = publisher; 36 _metrics = metrics; 34 37 } 35 38 … … 38 41 logger.debug("execute()"); 39 42 40 _ rowProcessingPublisher.run();43 _publisher.run(_metrics); 41 44 } 42 45 -
AnalyzerBeans/trunk/core/src/test/java/org/eobjects/analyzer/test/full/scenarios/PatternFinderAndStringAnalyzerDrillToDetailTest.java
r2749 r3040 66 66 AnalysisJobBuilder ajb = new AnalysisJobBuilder(configuration); 67 67 ajb.setDatastoreConnection(con); 68 68 69 69 Table table = dc.getDefaultSchema().getTableByName("EMPLOYEES"); 70 70 assertNotNull(table); … … 82 82 EmailStandardizerTransformer.class).addInputColumn(emailInputColumn); 83 83 84 AnalyzerJobBuilder<PatternFinderAnalyzer> pf = ajb 85 .addAnalyzer(PatternFinderAnalyzer.class); 84 AnalyzerJobBuilder<PatternFinderAnalyzer> pf = ajb.addAnalyzer(PatternFinderAnalyzer.class); 86 85 InputColumn<?> jobtitleInputColumn = ajb.getSourceColumnByName("JOBTITLE"); 87 86 pf.addInputColumn(jobtitleInputColumn); -
DataCleaner/trunk/core/src/main/java/org/eobjects/datacleaner/util/AnalysisRunnerSwingWorker.java
r3034 r3040 34 34 import org.eobjects.analyzer.job.FilterJob; 35 35 import org.eobjects.analyzer.job.TransformerJob; 36 import org.eobjects.analyzer.job.runner.AnalysisJobMetrics; 36 37 import org.eobjects.analyzer.job.runner.AnalysisListener; 37 38 import org.eobjects.analyzer.job.runner.AnalysisResultFuture; 38 39 import org.eobjects.analyzer.job.runner.AnalysisRunner; 39 40 import org.eobjects.analyzer.job.runner.AnalysisRunnerImpl; 41 import org.eobjects.analyzer.job.runner.AnalyzerMetrics; 42 import org.eobjects.analyzer.job.runner.ExplorerMetrics; 43 import org.eobjects.analyzer.job.runner.RowProcessingMetrics; 40 44 import org.eobjects.analyzer.result.AnalyzerResult; 41 45 import org.eobjects.analyzer.util.SourceColumnFinder; … … 78 82 79 83 @Override 80 public void jobBegin(AnalysisJob job ) {84 public void jobBegin(AnalysisJob job, AnalysisJobMetrics metrics) { 81 85 String now = new DateTime().toString(DateTimeFormat.fullTime()); 82 86 _progressInformationPanel.addUserLog("Job begin (" + now + ")"); … … 84 88 85 89 @Override 86 public void jobSuccess(AnalysisJob job ) {90 public void jobSuccess(AnalysisJob job, AnalysisJobMetrics metrics) { 87 91 String now = new DateTime().toString(DateTimeFormat.fullTime()); 88 92 _progressInformationPanel.addUserLog("Job success (" + now + ")"); … … 91 95 92 96 @Override 93 public void rowProcessingBegin(final AnalysisJob job, final Table table, final int expectedRows) { 97 public void rowProcessingBegin(final AnalysisJob job, final RowProcessingMetrics metrics) { 98 final int expectedRows = metrics.getExpectedRows(); 99 final Table table = metrics.getTable(); 94 100 if (expectedRows == -1) { 95 101 _progressInformationPanel.addUserLog("Starting row processing for " + table.getQualifiedLabel()); … … 102 108 103 109 @Override 104 public void rowProcessingProgress(AnalysisJob job, final Table table, final int currentRow) {105 _progressInformationPanel.updateProgress( table, currentRow);110 public void rowProcessingProgress(AnalysisJob job, final RowProcessingMetrics metrics, final int currentRow) { 111 _progressInformationPanel.updateProgress(metrics.getTable(), currentRow); 106 112 } 107 113 108 114 @Override 109 public void rowProcessingSuccess(AnalysisJob job, final Table table) {115 public void rowProcessingSuccess(AnalysisJob job, final RowProcessingMetrics metrics) { 110 116 String now = new DateTime().toString(DateTimeFormat.fullTime()); 111 _progressInformationPanel.addUserLog("Row processing for " + table.getQualifiedLabel() + " finished (" + now112 + "). Generating results ...");117 _progressInformationPanel.addUserLog("Row processing for " + metrics.getTable().getQualifiedLabel() + " finished (" 118 + now + "). Generating results ..."); 113 119 } 114 120 115 121 @Override 116 public void analyzerBegin(AnalysisJob job, final AnalyzerJob analyzerJob ) {122 public void analyzerBegin(AnalysisJob job, final AnalyzerJob analyzerJob, AnalyzerMetrics metrics) { 117 123 _progressInformationPanel.addUserLog("Starting analyzer '" + LabelUtils.getLabel(analyzerJob) + "'"); 118 124 } … … 174 180 175 181 @Override 176 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob ) {182 public void explorerBegin(AnalysisJob job, ExplorerJob explorerJob, ExplorerMetrics metrics) { 177 183 _progressInformationPanel.addUserLog("Starting explorer '" + LabelUtils.getLabel(explorerJob) + "'"); 178 184 }
Note: See TracChangeset
for help on using the changeset viewer.
