Source code for asterism.pipeline_manager.analysis_pipeline

"""
Module: analysis_pipeline
========================
Overview
--------
This module provides the implementation of the base class :class:`.AnalysisPipeline` class
used to create specific Pipelines that combines together different :class:`.AnalysisProcess`.
In the diagram the Pipeline #1 is combining different  :class:`.AnalysisProcess` P1...Pn.
See :class:`.AnalysisPipeline` for more details .

.. graphviz::

    digraph p  {
        subgraph  cluster0 {
        node [style=filled];
        "Process 1" -> "Process 2"->"Process 3"->"P..."->"Process n";
        "Process 3" -> "Process 1";
        label = "Pipeline 1";
        }

        subgraph cluster1 {
        node [style=filled];
        "Task 1" -> "Task 2"->"Task 3"->"T..."->"Task n";
        "Task 3" -> "Task 1";
        label = "Process 1";
        }


    }


Classes relations
---------------------------------------
.. figure::
   :align:   center


Classes and Inheritance Structure
----------------------------------------------
.. inheritance-diagram::




Summary
---------
.. autosummary::
    AnalysisProducts
    AnalysisPipeline
    IOConfProcess
    DoIOConfTask

Module API
-----------
"""
from __future__ import division, absolute_import,print_function
from .analysis_tasks import AnalysisTask
from .analysis_processes import AnalysisProcess

from .processing_tools import *
from .analysis_products import *
import glob 
from  astropy.io import fits as pf
import os
import numpy as np
import traceback
from  ..core.image_manager.image import Image
import time

__author__ = 'andrea tramacere'





[docs]class AnalysisPipeline(object): def __init__(self,name,func,plot_func=None,parser=None,argv=None,conf_file=None): self.name=name self.plot_func=plot_func self.func=func self.processes_list=[] self.parser=parser self.argv=argv self.conf_file=conf_file self.args=None self.args_dict=None self.args=None self.analysis_array_dtype=None self.add_analysis_process(IOConfProcess,'do_IO_conf',add_plot_task=False) self.IO_conf_task=self.get_process('do_IO_conf').do_IO_conf
[docs] def get_process(self,proc_name): found=False for process in self.processes_list: if proc_name==process.name: #print "-->",par_name,par.name,par.value,getattr(par,'value') found=True return process if found==False: process=None raise RuntimeError('process %s not found'%proc_name) return process
[docs] def add_analysis_process(self,process_class,name,add_plot_task=True): for process in self.processes_list: if process.name==name: raise ValueError('process name, already present') print ("add process",name ) process_class_inst=process_class(name,parser=self.parser,add_plot_task=add_plot_task) setattr(self,name,process_class_inst) self.processes_list.append(getattr(self,name)) if self.parser!=None: self.args= self.parser.parse_args()
#print self.args
[docs] def set_pars_from_parser(self): self.args = self.parser.parse_args() self.args_dict= self.args.__dict__ if self.args.dump_conf_file==False: conf_file_lines=read_config_file(self.args.conf_file) else: self.dump_configuration_file(self.args.conf_file) conf_file_lines=read_config_file(self.args.conf_file) for process in self.processes_list: process.set_pars_from_parser(conf_file_lines,self.args,self.argv,self.args_dict)
[docs] def set_pars_from_conf_file(self): conf_file_lines=read_config_file(self.conf_file) for process in self.processes_list: process.set_pars_from_parser(conf_file_lines,self.args,self.argv,self.args_dict)
[docs] def list_parameters(self): print() print("|-------------------------------------------------------") print("|Parameters for Script: ",self.name) print("|") print("|-------------------------------------------------------") print () for process in self.processes_list: process.list_parameters() print ("|-------------------------------------------------------") print()
[docs] def dump_configuration_file(self,file_name): f=open(file_name,'w') print(file=f) for process in self.processes_list: process.dump_to_conf_file(f) print(file=f) f.close()
[docs] def run(self,*args,**kwargs): exception_mess='' if self.args is not None: if self.args.dump_conf_file==True: return None self.set_infiles_list() self.set_io_conf() #self.log_stat=self.IO_conf_task.get_par_value('log_stat') self.script_exit_f=open(self.script_exit_file, 'w') start_time=time.time() #print "start time",start_time try: prods_collection,exit_status=self._run(args,kwargs) except Exception, e: exception_mess=e exit_status='FAILED' print("Failed script:", self.name) prods_collection=None print (traceback.format_exc()) #print('CICCIO'),self.script_exit_f, exit_status print(exit_status,file=self.script_exit_f) print(exception_mess,file=self.script_exit_f) if self.cluster_submit==True: self.write_job_exit_file(exit_status,exception_mess) self.script_exit_f.close() #print "stop time",time.time() print("exec time",time.time()-start_time) return prods_collection
def _run(self,*args,**kwargs): out_images_list=[] for in_file in self.in_files_list: print ("-> RUN") self.set_images_list(in_file) self.set_io_conf(infile=in_file) self.set_plotting() self.analysis_data_out=[] if self.input_seg_map_file is not None: self.input_seg_map=pf.getdata(self.input_seg_map_file) else: self.input_seg_map=None out_image_id=None image_id=None if self.images_list!=None: out_image_id=0 print (self.images_list) for image_id in self.images_list: print("-----------------------") print("processing image", image_id) exit_status=self._run_pipeline_func(image_id=image_id,out_image_id=out_image_id) out_image_id+=1 else: exit_status=self._run_pipeline_func(image_id=image_id,out_image_id=out_image_id) #out_image_id+=1 print("save_products",self.save_products) if self.save_products==True: self.write_out_products() if self.hmtl_index_file!=None: write_html_page(self.hmtl_index_file,out_images_list) return self.prods_collection,exit_status def _run_pipeline_func(self,image_id=None,out_image_id=None): if image_id is not None: image=Image(self.data[image_id]) else: image=Image(self.data) if self.input_seg_map is not None: if image_id is not None: input_seg_map=self.input_seg_map[image_id] else: input_seg_map=self.input_seg_map else: input_seg_map =None print("| run func for image id",image_id) self.prods_collection=self.func(self,image,image_id,input_seg_map=input_seg_map) exit_status='SUCCESS' if self.prods_collection is not None: if out_image_id==0 or image_id is None: self.build_out_files(self.prods_collection) #print("image_id",image_id) self.assign_out_files_values(self.prods_collection,image_id=image_id,out_image_id=out_image_id) if self.check_all_processes_exit_status()!='SUCCESS': exit_status='FAILED' self.write_to_script_exit_file(image_id) return exit_status
[docs] def write_out_products(self): for out_file in self.out_files_collection.out_files_list: print("out_file",out_file,out_file.name) out_file.write_to_file() print("done")
[docs] def assign_out_files_values(self,prods_collection,image_id=None,out_image_id=None): for prod,out_file in zip(prods_collection.prod_list,self.out_files_collection.out_files_list): if hasattr(out_file,'fill')==True: #print("CICCIO",prod.name) out_file.fill(prod,out_image_id=out_image_id,image_id=image_id)
[docs] def build_out_files(self,prods_collection): """ builds the :class:`.OutFilesCollection` that stores the output data corresponding to the analysis products. These data reflect the structure of the input images, i.e. single image, list of image, image cube Parameters ---------- out_analysis_products Returns ------- """ self.out_files_collection=OutFilesCollection() for prod in prods_collection.prod_list: if isinstance(prod, AnalysisProductFitsImage): self.out_files_collection.add_out_file(OutFileFitsImage(self.out_shape,working_dir=self.working_dir,flag=self.flag,name=prod.name)) elif isinstance(prod,AnalysisProductFigure): self.out_files_collection.add_out_file(OutFileFigure(self.n_images_out,file_type=self.save_plot_type,working_dir=self.working_dir,flag=self.flag,name=prod.name)) elif isinstance(prod,AnalysisProductRecArray): self.out_files_collection.add_out_file(OutFileRecArray(dtype=prod.analysis_array_dtype,working_dir=self.working_dir,flag=self.flag,name=prod.name)) elif isinstance(prod,AnalysisProductGeneric): self.out_files_collection.add_out_file(OutFileGeneric(working_dir=self.working_dir,flag=self.flag,name=prod.name)) elif isinstance(prod,AnalysisProductCatalog): self.out_files_collection.add_out_file(OutFileCatalog(working_dir=self.working_dir,flag=self.flag,name=prod.name)) elif isinstance(prod,AnalysisProductRegionCatalog): self.out_files_collection.add_out_file(OutFileRegionCatalog(prod.catalog_text,working_dir=self.working_dir,flag=self.flag,name=prod.name)) else: raise TypeError("The analysis product is not of the expected, it is %s"%(type(prod)))
# def save_plot_func(self,out_image_id,image_id,prods): # out_file_list=[] # # for ID,fig in enumerate(prods.figs_array): # if fig!=None: # if self.save_plot_type=='pdf' : # self.pdf_out.savefig(fig) # else: # print'saving image', out_image_id,prods.figs_name_array[ID] # fig_name='%4.4d_%4.4d_'%(out_image_id,image_id)+prods.figs_name_array[ID] # out_file_name= os.path.abspath(self.working_dir)+'/'+fig_name+'_'+self.plot_file_name # fig.savefig(out_file_name) # out_file_list.append(fig_name+'_'+self.plot_file_name) # # return out_file_list
[docs] def write_job_exit_file(self,exit_status,exception_mess): f=open(self.job_exit_file, 'w') if exit_status=='SUCCESS': print (self.job_exit_file) print(exit_status,file=f) f.close() else: print(exit_status, file=f) print(exception_mess,file=f) f.close()
[docs] def write_to_script_exit_file(self,image_id): for p in self.processes_list: if hasattr(p, 'exit_status'): if p.exit_status!='SUCCESS': print('image_id ', image_id,file=self.script_exit_f) print('process: ',p.name, p.exit_status,file=self.script_exit_f) print(p.exception_mess,file=self.script_exit_f) print(file=self.script_exit_f) print(file=self.script_exit_f) print(file=self.script_exit_f)
[docs] def set_infiles_list(self): self.in_files_list=glob.glob(self.IO_conf_task.get_par_value('infile')) if len(self.in_files_list)==0: print ("infile list", self.in_files_list) print ("infile value",self.IO_conf_task.get_par_value('infile')) raise RuntimeError("file list is empty check infile value %s", self.IO_conf_task.get_par_value('infile'))
[docs] def set_images_list(self,image_file): """ get images list """ print("|set image list") self.data=pf.getdata(image_file) if self.data.ndim==3: (images_num,x,y)=self.data.shape print ("input shape is: n_images,r,c", images_num,x,y) image_ids_offset=self.IO_conf_task.get_par_value('image_ids_offset') image_id=self.IO_conf_task.get_par_value('image_id') if self.image_id_file!=None: try: image_id=pf.getdata(self.image_id_file)['original_id'] except: try: image_id=np.genfromtxt(self.image_id_file, np.int64) except: raise RuntimeError("image_id file %s, is neither txt nor fits %s, or the column original_id is not present", self.image_id_file) image_id.sort() image_id_list=[] max_image_id=self.IO_conf_task.get_par_value('max_image_id') #sine the cl paramter type is a list, if it has just #one element its converted to a scalar #this is necessary to handle the case with image_min - image_max #image_id is a scalar if np.shape(image_id)!=() and len(image_id)==1: image_id_list=[image_id[0]+image_ids_offset] #image_id is a list elif np.shape(image_id)!=() and len(image_id)>1: image_id_list=[i+image_ids_offset for i in image_id] #image_id is not provided elif image_id==None : image_id_list=[0] min_image_id=image_id_list[0] if max_image_id==-1 : max_image_id= images_num-1 elif max_image_id!=None : max_image_id=max_image_id + image_ids_offset if image_id==None and max_image_id==None: max_image_id= images_num-1 elif len(image_id_list)==1 and max_image_id==None: max_image_id=image_id_list[0] if len(image_id_list)>1: images_num=len(image_id_list) self.images_list=image_id_list else: images_num=max_image_id+1 - min_image_id self.images_list=xrange(min_image_id,max_image_id+1) #print self.image_list self.n_images_out=len(self.images_list) print ("output shape is: n_images,r,c", self.n_images_out,x,y) self.out_shape=(self.n_images_out,x,y) elif self.data.ndim==2: self.images_list=None (x,y)=self.data.shape self.image=self.data self.out_shape=(x,y) self.n_images_out=1 else: raise RuntimeError("only 2-d or 3-d array are supported")
#print "->",self.images_list #self.out_shape=(images_num,x,y)
[docs] def check_all_processes_exit_status(self): all_ok='SUCCESS' for p in self.processes_list: if hasattr(p, 'exit_status'): if p.exit_status!='SUCCESS': all_ok='FAILED' return all_ok
[docs] def set_io_conf(self,infile=None): self.save_products=self.IO_conf_task.get_par_value('save_products') self.input_seg_map_file=self.IO_conf_task.get_par_value('input_seg_map_file') #self.out_analysis_file=self.IO_conf_task.get_par_value('out_analysis_file') self.working_dir=self.IO_conf_task.get_par_value('working_dir') self.save_plot_type=self.IO_conf_task.get_par_value('save_plot_type') self.cluster_submit=self.IO_conf_task.get_par_value('cluster_submit') self.job_exit_file=self.IO_conf_task.get_par_value('job_exit_file') self.script_exit_file=self.IO_conf_task.get_par_value('script_exit_file') self.image_id_file=self.IO_conf_task.get_par_value('image_id_file') self.flag=self.IO_conf_task.get_par_value('flag') if self.working_dir!='./': mkdir(self.working_dir) #if infile!=None: # if self.outfile=='': # self.outfile=os.path.basename(infile+'_out') #if self.outfile !=None: # self.data_out=np.zeros(self.out_shape) # if self.flag!=None: # self.outfile=self.flag+'_'+self.outfile # if self.working_dir!='./': # self.outfile= self.working_dir+'/'+self.outfile #if self.out_analysis_file!=None: #self.analysis_data_out=np.zeros(self.out_shape[0],dypte=self.analysis_dtype) # if self.flag!=None: # self.out_analysis_file=self.flag+'_'+self.out_analysis_file # if self.working_dir!='./': # self.out_analysis_file= self.working_dir+'/'+self.out_analysis_file if self.cluster_submit==True: self.job_exit_file=self.working_dir+'/'+ self.job_exit_file self.script_exit_file=self.working_dir+'/'+ self.script_exit_file if self.save_plot_type is not None and infile is not None: if self.save_plot_type=='pdf': from matplotlib.backends.backend_pdf import PdfPages suffix='.pdf' else: suffix='.'+self.save_plot_type if infile!=None: #if self.outfile==None: self.plot_file_name=os.path.basename(infile)+suffix #else: # self.plot_file_name='%s.%s'%(os.path.basename(self.outfile),suffix ) if self.flag!=None: self.plot_file_name=self.flag+'_'+self.plot_file_name if self.save_plot_type=='pdf': self.plot_file_name= os.path.abspath(self.working_dir)+'/'+self.plot_file_name self.pdf_out = PdfPages(self.plot_file_name) self.hmtl_index_file=None else: self.pdf_out=None self.hmtl_index_file=os.path.abspath(self.working_dir)+'/index.html' else: self.plot_file_name=None self.pdf_out=None self.hmtl_index_file=None
[docs] def set_plotting(self): self.force_no_plot=self.IO_conf_task.get_par_value('force_no_plot') self.plot=self.IO_conf_task.get_par_value('plot') print ("-> plot",self.plot)
#def preprocess_image(self,image): # # # image=image-image.min() # image=np.float32(image) # # # return image
[docs]def foo(): pass
[docs]class DoIOConfTask(AnalysisTask): def __init__(self,name='do_IO_conf',func=foo,parser=None,process=None): super(DoIOConfTask,self).__init__(name,func=func,parser=parser,process=process) self.parameters_list.add_par('infile', type=str, help='fits file with images',default=None,add_to_parser=True ) self.parameters_list.add_par('conf_file', type=str, help='configuration input file',default=None,add_to_parser=True ) self.parameters_list.add_par('-input_seg_map_file', type=str, help='',default=None,add_to_parser=True ) self.parameters_list.add_par('-dump_conf_file', action='store_true', help='save a template conf file with, the conf_file name will be used',add_to_parser=True ) #self.parameters_list.add_par('-log_stat', help='to select log fluxes', action='store_true',add_to_parser=True) self.parameters_list.add_par('-working_dir', type=str, help='working_directory',default='./' ,add_to_parser=True) self.parameters_list.add_par('-save_products', help='save pipeline analysis products',action='store_true',add_to_parser=True ) #self.parameters_list.add_par('-outfile', type=str, help='otuput fits file with rotated galaxies images',default=None ,add_to_parser=True) #self.parameters_list.add_par('-out_analysis_file', type=str, help='otuput file analysis_tasks results',default=None,add_to_parser=True ) #group_id=self.parameters_list.add_mex_group('image_id') self.parameters_list.add_par('-image_id_file', type=str, help='fits/txt file with image ids in column wiht name original_id',default=None,add_to_parser=True ) self.parameters_list.add_par('-image_id', type=int,nargs='*' ,help='id of the image',default=None,add_to_parser=True ) self.parameters_list.add_par('-max_image_id', type=np.int, help='to process only up to max_gal_id starting from gal_id, -1 to go to the end of ids', default=None,add_to_parser=True) self.parameters_list.add_par('-image_ids_offset', type=np.int, help='offset is added to all image_ids', default=0,add_to_parser=True) #self.parameters_list.add_par('-plot', help='to plot results',action='store_true',add_to_parser=True ) self.parameters_list.add_par('-save_plot_type', type=str, help='type of saved plot',default=None, choices=['svgz','pdf'] ,add_to_parser=True) self.parameters_list.add_par('-force_no_plot', help='force no plotting',action='store_true',add_to_parser=True ) self.parameters_list.add_par('-flag', type=str, default=None,add_to_parser=True,help='''flag for output names ''') self.parameters_list.add_par('-script_exit_file',type=str, help='exit file for the scritp, with error log',default='script_exit_file',add_to_parser=True) self.parameters_list.add_par('-cluster_submit', help='flag to write cluster job exit file',action='store_true',add_to_parser=True) self.parameters_list.add_par('-job_exit_file',type=str, help='exit status file for the cluster job',default='job_exit_file',add_to_parser=True)
[docs] def run(self): pass
[docs]class IOConfProcess(AnalysisProcess): def __init__(self,name='IO_conf',func=foo,parser=None,add_plot_task=False): super(IOConfProcess,self).__init__(name,func,parser=parser,add_plot_task=add_plot_task) self.add_analysis_task(DoIOConfTask,'do_IO_conf' )