"""
Module: analysis_pipeline
========================
Overview
--------
This module provides the implementation of the base class :class:`.AnalysisPipeline` class
used to create specific Pipelines that combines together different :class:`.AnalysisProcess`.
In the diagram the Pipeline #1 is combining different :class:`.AnalysisProcess` P1...Pn.
See :class:`.AnalysisPipeline` for more details .
.. graphviz::
digraph p {
subgraph cluster0 {
node [style=filled];
"Process 1" -> "Process 2"->"Process 3"->"P..."->"Process n";
"Process 3" -> "Process 1";
label = "Pipeline 1";
}
subgraph cluster1 {
node [style=filled];
"Task 1" -> "Task 2"->"Task 3"->"T..."->"Task n";
"Task 3" -> "Task 1";
label = "Process 1";
}
}
Classes relations
---------------------------------------
.. figure::
:align: center
Classes and Inheritance Structure
----------------------------------------------
.. inheritance-diagram::
Summary
---------
.. autosummary::
AnalysisProducts
AnalysisPipeline
IOConfProcess
DoIOConfTask
Module API
-----------
"""
from __future__ import division, absolute_import,print_function
from .analysis_tasks import AnalysisTask
from .analysis_processes import AnalysisProcess
from .processing_tools import *
from .analysis_products import *
import glob
from astropy.io import fits as pf
import os
import numpy as np
import traceback
from ..core.image_manager.image import Image
import time
__author__ = 'andrea tramacere'
[docs]class AnalysisPipeline(object):
def __init__(self,name,func,plot_func=None,parser=None,argv=None,conf_file=None):
self.name=name
self.plot_func=plot_func
self.func=func
self.processes_list=[]
self.parser=parser
self.argv=argv
self.conf_file=conf_file
self.args=None
self.args_dict=None
self.args=None
self.analysis_array_dtype=None
self.add_analysis_process(IOConfProcess,'do_IO_conf',add_plot_task=False)
self.IO_conf_task=self.get_process('do_IO_conf').do_IO_conf
[docs] def get_process(self,proc_name):
found=False
for process in self.processes_list:
if proc_name==process.name:
#print "-->",par_name,par.name,par.value,getattr(par,'value')
found=True
return process
if found==False:
process=None
raise RuntimeError('process %s not found'%proc_name)
return process
[docs] def add_analysis_process(self,process_class,name,add_plot_task=True):
for process in self.processes_list:
if process.name==name:
raise ValueError('process name, already present')
print ("add process",name )
process_class_inst=process_class(name,parser=self.parser,add_plot_task=add_plot_task)
setattr(self,name,process_class_inst)
self.processes_list.append(getattr(self,name))
if self.parser!=None:
self.args= self.parser.parse_args()
#print self.args
[docs] def set_pars_from_parser(self):
self.args = self.parser.parse_args()
self.args_dict= self.args.__dict__
if self.args.dump_conf_file==False:
conf_file_lines=read_config_file(self.args.conf_file)
else:
self.dump_configuration_file(self.args.conf_file)
conf_file_lines=read_config_file(self.args.conf_file)
for process in self.processes_list:
process.set_pars_from_parser(conf_file_lines,self.args,self.argv,self.args_dict)
[docs] def set_pars_from_conf_file(self):
conf_file_lines=read_config_file(self.conf_file)
for process in self.processes_list:
process.set_pars_from_parser(conf_file_lines,self.args,self.argv,self.args_dict)
[docs] def list_parameters(self):
print()
print("|-------------------------------------------------------")
print("|Parameters for Script: ",self.name)
print("|")
print("|-------------------------------------------------------")
print ()
for process in self.processes_list:
process.list_parameters()
print ("|-------------------------------------------------------")
print()
[docs] def dump_configuration_file(self,file_name):
f=open(file_name,'w')
print(file=f)
for process in self.processes_list:
process.dump_to_conf_file(f)
print(file=f)
f.close()
[docs] def run(self,*args,**kwargs):
exception_mess=''
if self.args is not None:
if self.args.dump_conf_file==True:
return None
self.set_infiles_list()
self.set_io_conf()
#self.log_stat=self.IO_conf_task.get_par_value('log_stat')
self.script_exit_f=open(self.script_exit_file, 'w')
start_time=time.time()
#print "start time",start_time
try:
prods_collection,exit_status=self._run(args,kwargs)
except Exception, e:
exception_mess=e
exit_status='FAILED'
print("Failed script:", self.name)
prods_collection=None
print (traceback.format_exc())
#print('CICCIO'),self.script_exit_f, exit_status
print(exit_status,file=self.script_exit_f)
print(exception_mess,file=self.script_exit_f)
if self.cluster_submit==True:
self.write_job_exit_file(exit_status,exception_mess)
self.script_exit_f.close()
#print "stop time",time.time()
print("exec time",time.time()-start_time)
return prods_collection
def _run(self,*args,**kwargs):
out_images_list=[]
for in_file in self.in_files_list:
print ("-> RUN")
self.set_images_list(in_file)
self.set_io_conf(infile=in_file)
self.set_plotting()
self.analysis_data_out=[]
if self.input_seg_map_file is not None:
self.input_seg_map=pf.getdata(self.input_seg_map_file)
else:
self.input_seg_map=None
out_image_id=None
image_id=None
if self.images_list!=None:
out_image_id=0
print (self.images_list)
for image_id in self.images_list:
print("-----------------------")
print("processing image", image_id)
exit_status=self._run_pipeline_func(image_id=image_id,out_image_id=out_image_id)
out_image_id+=1
else:
exit_status=self._run_pipeline_func(image_id=image_id,out_image_id=out_image_id)
#out_image_id+=1
print("save_products",self.save_products)
if self.save_products==True:
self.write_out_products()
if self.hmtl_index_file!=None:
write_html_page(self.hmtl_index_file,out_images_list)
return self.prods_collection,exit_status
def _run_pipeline_func(self,image_id=None,out_image_id=None):
if image_id is not None:
image=Image(self.data[image_id])
else:
image=Image(self.data)
if self.input_seg_map is not None:
if image_id is not None:
input_seg_map=self.input_seg_map[image_id]
else:
input_seg_map=self.input_seg_map
else:
input_seg_map =None
print("| run func for image id",image_id)
self.prods_collection=self.func(self,image,image_id,input_seg_map=input_seg_map)
exit_status='SUCCESS'
if self.prods_collection is not None:
if out_image_id==0 or image_id is None:
self.build_out_files(self.prods_collection)
#print("image_id",image_id)
self.assign_out_files_values(self.prods_collection,image_id=image_id,out_image_id=out_image_id)
if self.check_all_processes_exit_status()!='SUCCESS':
exit_status='FAILED'
self.write_to_script_exit_file(image_id)
return exit_status
[docs] def write_out_products(self):
for out_file in self.out_files_collection.out_files_list:
print("out_file",out_file,out_file.name)
out_file.write_to_file()
print("done")
[docs] def assign_out_files_values(self,prods_collection,image_id=None,out_image_id=None):
for prod,out_file in zip(prods_collection.prod_list,self.out_files_collection.out_files_list):
if hasattr(out_file,'fill')==True:
#print("CICCIO",prod.name)
out_file.fill(prod,out_image_id=out_image_id,image_id=image_id)
[docs] def build_out_files(self,prods_collection):
"""
builds the :class:`.OutFilesCollection` that stores the output data corresponding
to the analysis products. These data reflect the structure of the input images,
i.e. single image, list of image, image cube
Parameters
----------
out_analysis_products
Returns
-------
"""
self.out_files_collection=OutFilesCollection()
for prod in prods_collection.prod_list:
if isinstance(prod, AnalysisProductFitsImage):
self.out_files_collection.add_out_file(OutFileFitsImage(self.out_shape,working_dir=self.working_dir,flag=self.flag,name=prod.name))
elif isinstance(prod,AnalysisProductFigure):
self.out_files_collection.add_out_file(OutFileFigure(self.n_images_out,file_type=self.save_plot_type,working_dir=self.working_dir,flag=self.flag,name=prod.name))
elif isinstance(prod,AnalysisProductRecArray):
self.out_files_collection.add_out_file(OutFileRecArray(dtype=prod.analysis_array_dtype,working_dir=self.working_dir,flag=self.flag,name=prod.name))
elif isinstance(prod,AnalysisProductGeneric):
self.out_files_collection.add_out_file(OutFileGeneric(working_dir=self.working_dir,flag=self.flag,name=prod.name))
elif isinstance(prod,AnalysisProductCatalog):
self.out_files_collection.add_out_file(OutFileCatalog(working_dir=self.working_dir,flag=self.flag,name=prod.name))
elif isinstance(prod,AnalysisProductRegionCatalog):
self.out_files_collection.add_out_file(OutFileRegionCatalog(prod.catalog_text,working_dir=self.working_dir,flag=self.flag,name=prod.name))
else:
raise TypeError("The analysis product is not of the expected, it is %s"%(type(prod)))
# def save_plot_func(self,out_image_id,image_id,prods):
# out_file_list=[]
#
# for ID,fig in enumerate(prods.figs_array):
# if fig!=None:
# if self.save_plot_type=='pdf' :
# self.pdf_out.savefig(fig)
# else:
# print'saving image', out_image_id,prods.figs_name_array[ID]
# fig_name='%4.4d_%4.4d_'%(out_image_id,image_id)+prods.figs_name_array[ID]
# out_file_name= os.path.abspath(self.working_dir)+'/'+fig_name+'_'+self.plot_file_name
# fig.savefig(out_file_name)
# out_file_list.append(fig_name+'_'+self.plot_file_name)
#
# return out_file_list
[docs] def write_job_exit_file(self,exit_status,exception_mess):
f=open(self.job_exit_file, 'w')
if exit_status=='SUCCESS':
print (self.job_exit_file)
print(exit_status,file=f)
f.close()
else:
print(exit_status, file=f)
print(exception_mess,file=f)
f.close()
[docs] def write_to_script_exit_file(self,image_id):
for p in self.processes_list:
if hasattr(p, 'exit_status'):
if p.exit_status!='SUCCESS':
print('image_id ', image_id,file=self.script_exit_f)
print('process: ',p.name, p.exit_status,file=self.script_exit_f)
print(p.exception_mess,file=self.script_exit_f)
print(file=self.script_exit_f)
print(file=self.script_exit_f)
print(file=self.script_exit_f)
[docs] def set_infiles_list(self):
self.in_files_list=glob.glob(self.IO_conf_task.get_par_value('infile'))
if len(self.in_files_list)==0:
print ("infile list", self.in_files_list)
print ("infile value",self.IO_conf_task.get_par_value('infile'))
raise RuntimeError("file list is empty check infile value %s", self.IO_conf_task.get_par_value('infile'))
[docs] def set_images_list(self,image_file):
"""
get images list
"""
print("|set image list")
self.data=pf.getdata(image_file)
if self.data.ndim==3:
(images_num,x,y)=self.data.shape
print ("input shape is: n_images,r,c", images_num,x,y)
image_ids_offset=self.IO_conf_task.get_par_value('image_ids_offset')
image_id=self.IO_conf_task.get_par_value('image_id')
if self.image_id_file!=None:
try:
image_id=pf.getdata(self.image_id_file)['original_id']
except:
try:
image_id=np.genfromtxt(self.image_id_file, np.int64)
except:
raise RuntimeError("image_id file %s, is neither txt nor fits %s, or the column original_id is not present", self.image_id_file)
image_id.sort()
image_id_list=[]
max_image_id=self.IO_conf_task.get_par_value('max_image_id')
#sine the cl paramter type is a list, if it has just
#one element its converted to a scalar
#this is necessary to handle the case with image_min - image_max
#image_id is a scalar
if np.shape(image_id)!=() and len(image_id)==1:
image_id_list=[image_id[0]+image_ids_offset]
#image_id is a list
elif np.shape(image_id)!=() and len(image_id)>1:
image_id_list=[i+image_ids_offset for i in image_id]
#image_id is not provided
elif image_id==None :
image_id_list=[0]
min_image_id=image_id_list[0]
if max_image_id==-1 :
max_image_id= images_num-1
elif max_image_id!=None :
max_image_id=max_image_id + image_ids_offset
if image_id==None and max_image_id==None:
max_image_id= images_num-1
elif len(image_id_list)==1 and max_image_id==None:
max_image_id=image_id_list[0]
if len(image_id_list)>1:
images_num=len(image_id_list)
self.images_list=image_id_list
else:
images_num=max_image_id+1 - min_image_id
self.images_list=xrange(min_image_id,max_image_id+1)
#print self.image_list
self.n_images_out=len(self.images_list)
print ("output shape is: n_images,r,c", self.n_images_out,x,y)
self.out_shape=(self.n_images_out,x,y)
elif self.data.ndim==2:
self.images_list=None
(x,y)=self.data.shape
self.image=self.data
self.out_shape=(x,y)
self.n_images_out=1
else:
raise RuntimeError("only 2-d or 3-d array are supported")
#print "->",self.images_list
#self.out_shape=(images_num,x,y)
[docs] def check_all_processes_exit_status(self):
all_ok='SUCCESS'
for p in self.processes_list:
if hasattr(p, 'exit_status'):
if p.exit_status!='SUCCESS':
all_ok='FAILED'
return all_ok
[docs] def set_io_conf(self,infile=None):
self.save_products=self.IO_conf_task.get_par_value('save_products')
self.input_seg_map_file=self.IO_conf_task.get_par_value('input_seg_map_file')
#self.out_analysis_file=self.IO_conf_task.get_par_value('out_analysis_file')
self.working_dir=self.IO_conf_task.get_par_value('working_dir')
self.save_plot_type=self.IO_conf_task.get_par_value('save_plot_type')
self.cluster_submit=self.IO_conf_task.get_par_value('cluster_submit')
self.job_exit_file=self.IO_conf_task.get_par_value('job_exit_file')
self.script_exit_file=self.IO_conf_task.get_par_value('script_exit_file')
self.image_id_file=self.IO_conf_task.get_par_value('image_id_file')
self.flag=self.IO_conf_task.get_par_value('flag')
if self.working_dir!='./':
mkdir(self.working_dir)
#if infile!=None:
# if self.outfile=='':
# self.outfile=os.path.basename(infile+'_out')
#if self.outfile !=None:
# self.data_out=np.zeros(self.out_shape)
# if self.flag!=None:
# self.outfile=self.flag+'_'+self.outfile
# if self.working_dir!='./':
# self.outfile= self.working_dir+'/'+self.outfile
#if self.out_analysis_file!=None:
#self.analysis_data_out=np.zeros(self.out_shape[0],dypte=self.analysis_dtype)
# if self.flag!=None:
# self.out_analysis_file=self.flag+'_'+self.out_analysis_file
# if self.working_dir!='./':
# self.out_analysis_file= self.working_dir+'/'+self.out_analysis_file
if self.cluster_submit==True:
self.job_exit_file=self.working_dir+'/'+ self.job_exit_file
self.script_exit_file=self.working_dir+'/'+ self.script_exit_file
if self.save_plot_type is not None and infile is not None:
if self.save_plot_type=='pdf':
from matplotlib.backends.backend_pdf import PdfPages
suffix='.pdf'
else:
suffix='.'+self.save_plot_type
if infile!=None:
#if self.outfile==None:
self.plot_file_name=os.path.basename(infile)+suffix
#else:
# self.plot_file_name='%s.%s'%(os.path.basename(self.outfile),suffix )
if self.flag!=None:
self.plot_file_name=self.flag+'_'+self.plot_file_name
if self.save_plot_type=='pdf':
self.plot_file_name= os.path.abspath(self.working_dir)+'/'+self.plot_file_name
self.pdf_out = PdfPages(self.plot_file_name)
self.hmtl_index_file=None
else:
self.pdf_out=None
self.hmtl_index_file=os.path.abspath(self.working_dir)+'/index.html'
else:
self.plot_file_name=None
self.pdf_out=None
self.hmtl_index_file=None
[docs] def set_plotting(self):
self.force_no_plot=self.IO_conf_task.get_par_value('force_no_plot')
self.plot=self.IO_conf_task.get_par_value('plot')
print ("-> plot",self.plot)
#def preprocess_image(self,image):
#
#
# image=image-image.min()
# image=np.float32(image)
#
#
# return image
[docs]class DoIOConfTask(AnalysisTask):
def __init__(self,name='do_IO_conf',func=foo,parser=None,process=None):
super(DoIOConfTask,self).__init__(name,func=func,parser=parser,process=process)
self.parameters_list.add_par('infile', type=str, help='fits file with images',default=None,add_to_parser=True )
self.parameters_list.add_par('conf_file', type=str, help='configuration input file',default=None,add_to_parser=True )
self.parameters_list.add_par('-input_seg_map_file', type=str, help='',default=None,add_to_parser=True )
self.parameters_list.add_par('-dump_conf_file', action='store_true', help='save a template conf file with, the conf_file name will be used',add_to_parser=True )
#self.parameters_list.add_par('-log_stat', help='to select log fluxes', action='store_true',add_to_parser=True)
self.parameters_list.add_par('-working_dir', type=str, help='working_directory',default='./' ,add_to_parser=True)
self.parameters_list.add_par('-save_products', help='save pipeline analysis products',action='store_true',add_to_parser=True )
#self.parameters_list.add_par('-outfile', type=str, help='otuput fits file with rotated galaxies images',default=None ,add_to_parser=True)
#self.parameters_list.add_par('-out_analysis_file', type=str, help='otuput file analysis_tasks results',default=None,add_to_parser=True )
#group_id=self.parameters_list.add_mex_group('image_id')
self.parameters_list.add_par('-image_id_file', type=str, help='fits/txt file with image ids in column wiht name original_id',default=None,add_to_parser=True )
self.parameters_list.add_par('-image_id', type=int,nargs='*' ,help='id of the image',default=None,add_to_parser=True )
self.parameters_list.add_par('-max_image_id', type=np.int, help='to process only up to max_gal_id starting from gal_id, -1 to go to the end of ids', default=None,add_to_parser=True)
self.parameters_list.add_par('-image_ids_offset', type=np.int, help='offset is added to all image_ids', default=0,add_to_parser=True)
#self.parameters_list.add_par('-plot', help='to plot results',action='store_true',add_to_parser=True )
self.parameters_list.add_par('-save_plot_type', type=str, help='type of saved plot',default=None, choices=['svgz','pdf'] ,add_to_parser=True)
self.parameters_list.add_par('-force_no_plot', help='force no plotting',action='store_true',add_to_parser=True )
self.parameters_list.add_par('-flag', type=str, default=None,add_to_parser=True,help='''flag for output names ''')
self.parameters_list.add_par('-script_exit_file',type=str, help='exit file for the scritp, with error log',default='script_exit_file',add_to_parser=True)
self.parameters_list.add_par('-cluster_submit', help='flag to write cluster job exit file',action='store_true',add_to_parser=True)
self.parameters_list.add_par('-job_exit_file',type=str, help='exit status file for the cluster job',default='job_exit_file',add_to_parser=True)
[docs]class IOConfProcess(AnalysisProcess):
def __init__(self,name='IO_conf',func=foo,parser=None,add_plot_task=False):
super(IOConfProcess,self).__init__(name,func,parser=parser,add_plot_task=add_plot_task)
self.add_analysis_task(DoIOConfTask,'do_IO_conf' )