Source code for asterism.pipeline_manager.analysis_processes

"""
Module: analysis_process
========================



Overview
--------

This module provides the implementation of the base class :class:`.AnalysisProcess` class
used to create the create specific Processes.

.. graphviz::

    digraph p  {
        subgraph cluster0 {
        node [style=filled];
        "Task 1" -> "Task 2"->" Task 3"->"..."->"Task n";
        "Task 3" -> "Task 1";
        label = "Process 1";
        }
    }





Classes relations
---------------------------------------
.. figure::
   :align:   center


Classes and Inheritance Structure
----------------------------------------------
.. inheritance-diagram::




Summary
---------
.. autosummary::
    AnalysisProducts
    AnalysisProcessProducts
    AnalysisProcess
    DoPlotTask

Module API
-----------
"""

from __future__ import division, absolute_import

import sys
import  traceback

import numpy as np

from asterism.pipeline_manager.analysis_products import AnalysisProduct
from .analysis_tasks import AnalysisTask

__author__ = 'andrea tramacere'


[docs]class AnalysisProcess(object): """ This class is a base class to handle Processes. Processes are combinations of :class:`.AnalysisTask`. The combination of Tasks is implemented by the member `func`, a function that is implemented in each derived class, specific for each derived class, and that is passed as an argument of the constructor. The `func` is called by the method :meth:`.AnalysisProcess.run`. """ def __init__(self,name,func,plot_func=None,parser=None,add_plot_task=True): """ Parameters ---------- name : str Name of the Process func : callable A funtion that implements the algorithm to combine :class:`.AnalysisTask` plot_func : callable A plot function parser : add_plot_task : Returns ------- """ self.name=name self.plot_func=plot_func self.func=func self.tasks_list=[] self.parser=parser if add_plot_task==True: self.add_analysis_task(DoPlotTask,'do_plot' )
[docs] def add_analysis_task(self,task_class,name): for task in self.tasks_list: if task.name==name: raise ValueError('task name, already present') print "add task",name task_class_inst=task_class(name,parser=self.parser) task_class_inst.process=self setattr(self,name,task_class_inst) self.tasks_list.append(getattr(self,name))
[docs] def set_pars_from_parser(self,conf_file_lines,args,argv,args_dict): for task in self.tasks_list: print "|set pars for task",task.name task.set_pars_from_parser(args,argv,args_dict,conf_file_lines=conf_file_lines) if task.name=='do_plot': self.set_plotting()
[docs] def set_par(self,task_name,par_name,**kwargs): for task in self.tasks_list: if task.name==task_name: task.parameters_list.set_par(par_name,**kwargs)
[docs] def list_parameters(self): print "|-------------------------------------------------------" print "|Parameters for process: ",self.name print "|-------------------------------------------------------" for task in self.tasks_list: task.list_parameters() print "|-------------------------------------------------------" print
[docs] def dump_to_conf_file(self,file): #f=open(file_name,'w') print>>file print>>file,"[ Process: %s: start ]"%self.name for task in self.tasks_list: task.parameters_list.dump_to_conf_file(file) print>>file,"[ Process: %s: stop ]"%self.name print>>file
#f.close()
[docs] def run(self,image,image_id=None,extra_message=None,**kwargs): """ Method that calls the `.func` to combine :class:`.AnalysisTask` and produce the output Parameters ---------- image: image_id: extra_message: **kwargs: Returns ------- prods : :class:`.AnalysisProductsCollection` Class storing the process output. """ self.start_message(extra_message=extra_message) self.exit_status='SUCCESS' self.exception_mess='' try: prods_collection=self.func(self,image,image_id,**kwargs) except Exception as e: self.exit_status='FAILED' self.exception_mess=e prods_collection=None print "Failed process:", self.name print "Image ID:", image_id traceback.print_exc(file=sys.stdout) sys.exit(1) self.stop_message() return prods_collection
[docs] def set_plotting(self): self.force_no_plot=self.do_plot.get_par_value('force_no_plot') self.plot=self.do_plot.get_par_value('plot') self.save_plot=self.do_plot.get_par_value('save_plot')
[docs] def run_plotting(self,plot_func,*args,**kwargs): self.set_plotting() if self.force_no_plot is False: if self.plot==True or self.save_plot==True : fig=plot_func(*args,**kwargs) return fig
[docs] def start_message(self,extra_message=None): print "|-------------------------------------------------------" print "| Process:",self.name, "Start" if extra_message != None: print "|",extra_message print "|-------------------------------------------------------"
[docs] def stop_message(self,): print "|-------------------------------------------------------" print "| Porcess:",self.name, "Done" print "|-------------------------------------------------------" print print
[docs]def foo(): pass
[docs]class DoPlotTask(AnalysisTask): def __init__(self,name='do_plot',func=foo,parser=None,process=None): super(DoPlotTask,self).__init__(name,func=func,parser=parser,process=process) self.parameters_list.add_par('-plot', help='to plot results',action='store_true') self.parameters_list.add_par('-save_plot', help='to save plots to pdf file',action='store_true') self.parameters_list.add_par('-force_no_plot', help='force no plotting',action='store_true')
[docs] def run(self): pass