Source code for asterism.analysis_tasks.source_detection.deblending.deblending


from __future__ import division, absolute_import, print_function
from ....pipeline_manager.analysis_tasks import AnalysisTask

import numpy as np

__author__ = 'andrea tramacere'



[docs]class DeblendedProducts(object): def __init__(self,parent,children_list,offset_x=0,offset_y=0): self.parent=parent self.children_list=children_list self.offset_x=offset_x self.offset_y=offset_y
[docs]def build_source_cluster(deblended_cluster_list,parent,offset_x=0,offset_y=0): src_cl_list=[] for ID,children_cluster in enumerate(deblended_cluster_list): children_cluster.position_array=children_cluster.position_array+[offset_x,offset_y] src_cl_list.append(children_cluster.SourceCluster_factory(parent_ID=parent.ID)) print("->ID,",ID,children_cluster.ID,src_cl_list[ID].cartesian.x.max(),src_cl_list[ID].cartesian.y.max()) return src_cl_list
[docs]def find_direct_children(parent_cluster,children_src_clusters_list): for cl in children_src_clusters_list: if parent_cluster.check_is_in_cluster(cl.x_c,cl.y_c)[0] == True: return cl return None
[docs]def test_plot(parent_cluster_image,children_cluster_list,cl_off_x,cl_off_y): from asterism.plotting.plot_tools import plot_contour, show_image import pylab as plt image, off_set_x, off_set_y, masked_pixels=parent_cluster_image.get_cluster_image_array() im, ax = show_image(image) for ID,cl in enumerate(children_cluster_list): if cl.contour_shape is not None: plot_contour(ax, cl.contour_shape[:, 0] -cl_off_x , cl.contour_shape[:, 1]-cl_off_y , 'w') ax.annotate('ID=%d' % (ID), xy=(cl.x_c - cl_off_x, cl.y_c - cl_off_y ), color='w') plt.show()
[docs]def validate_children_clusters(parent_cluster, children_clusters_list, children_min_frac_integ_flux, children_min_frac_peak_flux, children_compact_frac_size, children_ext_frac_size, children_bright_frac_peak_flux, verbose=False): """ Returns: """ validated_list=[] orphan_list=[] parent_flux=np.float(parent_cluster.flux.sum()) parent_peak_flux=np.float(parent_cluster.flux.max()) valid=False if len(children_clusters_list)<2: return [],[] print ("| children clusters found",len(children_clusters_list)) #print([cl.ID for cl in children_clusters_list]) for ID,children_source_cluster in enumerate(children_clusters_list): frac_size=np.float(children_source_cluster.cl_len)/np.float(parent_cluster.cl_len) frac_flux=np.float(children_source_cluster.flux.sum())/parent_flux frac_peak_flux=np.float(children_source_cluster.flux.max())/parent_peak_flux c=None #compact valid=False if frac_size<=children_compact_frac_size: c='compact' if frac_peak_flux>children_min_frac_peak_flux: valid=True #extended if frac_size>=children_ext_frac_size: c = 'extended' if frac_flux>children_min_frac_integ_flux: valid=True #bright if frac_peak_flux>children_bright_frac_peak_flux: c = 'bright' valid=True if valid is True: validated_list.append(children_source_cluster) else: orphan_list.append(children_source_cluster) if verbose is True: print("|test for subcluster",ID) print("|children is",c) print("|frac size",frac_size,children_compact_frac_size,children_ext_frac_size) print("|frac_flux ",frac_flux,children_min_frac_integ_flux) print("|frac_peak_flux ",frac_peak_flux,children_min_frac_peak_flux) print("|valid=",valid) print("|-") #print([cl.ID for cl in validated_list]) #print([cl.ID for cl in orphan_list]) return validated_list,orphan_list
[docs]def assign_orphane_clusters(validated_denclue_clusters_list,orphan_list): """ Returns: """ print ("|merging orphagn") for orphan in orphan_list: closest_ot_orphan=orphan.get_closest_cluster(validated_denclue_clusters_list) closest_ot_orphan.merge_with(orphan)
[docs]def deblend_parent(parent_cluster, children_cluster_list, validate_children, children_min_frac_integ_flux, children_min_frac_peak_flux, children_compact_frac_size, children_ext_frac_size, children_bright_frac_peak_flux, offset_x=0, offset_y=0, verbose=False, plot=False): """ from the initial list of children are removed the not-valid ones (orphanes) and the final list of validated children is returned, with the following steps 1) srcs clusters are built from basic clustering clusters 2) the direct children (i.e. with center within parent) is identified 3) all the children except the direct are validated, and the list of children is divide in validated and orphanes 4) if validated are None: only the direct is returned, or the parent is direct is None else: the list of [direct + validated..] is returned where direct=parent-validated Parameters ---------- parent_cluster: deblended_cluster_list: children_min_frac_integ_flux: children_min_frac_peak_flux: children_min_frac_size: offset_x: offset_y: Returns ------- final_deblended_cluster_list: """ validated_cluster_list=[] orphan_list=[] children_src_clusters_list=[] if len(children_cluster_list)>1: if verbose==True: print ("| analysing sub clusters for parent cluster",parent_cluster.ID) children_src_clusters_list=build_source_cluster(children_cluster_list,parent_cluster,offset_x=offset_x,offset_y=offset_y) if validate_children == True: direct_children_cluster=find_direct_children(parent_cluster,children_src_clusters_list) if direct_children_cluster is not None: children_src_clusters_list.remove(direct_children_cluster) validated_cluster_list=[direct_children_cluster] _list,orphan_list= validate_children_clusters(parent_cluster, children_src_clusters_list, children_min_frac_integ_flux, children_min_frac_peak_flux, children_compact_frac_size, children_ext_frac_size, children_bright_frac_peak_flux, verbose=verbose) validated_cluster_list.extend(_list) else: validated_cluster_list=children_src_clusters_list orphan_list=[] else: validated_cluster_list = [parent_cluster] if len(validated_cluster_list)>0: print ("| children clusters validated",len(validated_cluster_list)) if len(orphan_list)>0: assign_orphane_clusters(validated_cluster_list, orphan_list) if plot is True: test_plot(parent_cluster,children_src_clusters_list,offset_x,offset_y) return validated_cluster_list
[docs]def do_deblending_validation(deblended_prod_list, validate_children=False, children_min_frac_integ_flux=0.1, children_min_frac_peak_flux=0.1, children_compact_frac_size=0.1, children_ext_frac_size=0.1, children_bright_frac_peak_flux=0.1, verbose=False, plot=False): """ Top level function for the deblending validation. Parameters ---------- deblended_prod_list validate_children children_min_frac_integ_flux children_min_frac_peak_flux children_compact_frac_size children_ext_frac_size children_bright_frac_peak_flux verbose Returns ------- """ final_list=[] for p in deblended_prod_list: _list=deblend_parent(p.parent, p.children_list, validate_children=validate_children, children_min_frac_integ_flux=children_min_frac_integ_flux, children_min_frac_peak_flux=children_min_frac_peak_flux, children_compact_frac_size=children_compact_frac_size, children_ext_frac_size=children_ext_frac_size, children_bright_frac_peak_flux=children_bright_frac_peak_flux, offset_x=p.offset_x, offset_y=p.offset_y, verbose=verbose, plot=plot) final_list.extend(_list) CL_ID = 0 for deblended_cluster in final_list: deblended_cluster.ID = CL_ID CL_ID += 1 return final_list
[docs]class DoDeblendingValidationTask(AnalysisTask): def __init__(self, name='deblending_validation', func=do_deblending_validation, parser=None, process=None): super(DoDeblendingValidationTask,self).__init__(name=name,func=func,parser=parser,process=process) self.parameters_list.add_par('-validate_children', help='if True, then the resulting children sub-clusters are processed to be validated', action='store_true') self.parameters_list.add_par('-children_compact_frac_size', type=np.float, help="""children to parent fractional size in pixels, below which the children is considered *compact* """, default=0.1) self.parameters_list.add_par('-children_ext_frac_size', type=np.float, help='children to parent fractional size in pixels, above which the children is considered *extended*', default=0.1) self.parameters_list.add_par('-children_bright_frac_peak_flux', type=np.float, help='min. ratio for extended children', default=0.1) self.parameters_list.add_par('-children_min_frac_integ_flux', type=np.float, help='min. val of the ratio of children to parent integ. flux, to validate an *extended* source children', default=0.10) self.parameters_list.add_par('-children_min_frac_peak_flux', type=np.float, help='min. ratio of children ot parent peak flux to validate a *compact* source children', default=0.10) self.parameters_list.add_par('-verbose', help='set verbosity on', action='store_true') self.parameters_list.add_par('-plot', help='plot ', action='store_true') self.func=func
[docs]def do_set_deblending_method(method,): allowed_methods=DoSetDeblendingMethodTask._get_allowed_methods() if method not in allowed_methods: raise RuntimeError('denclue method is %s, but allowed methods are%s'%(method,allowed_methods)) return method
[docs]class DoSetDeblendingMethodTask(AnalysisTask): def __init__(self, name='set_deblending_method', func=do_set_deblending_method, parser=None, process=None): super(DoSetDeblendingMethodTask,self).__init__(name=name,func=func,parser=parser,process=process) self.parameters_list.add_par('-method',type=str, help='allowed methods: %s'%self._get_allowed_methods,default='denclue_watershed_deblending') self.func=func @staticmethod def _get_allowed_methods(): return ['guass_laplace_watershed', 'denclue' ,'denclue_watershed_deblending','no_deblending']