from __future__ import division, absolute_import, print_function
from ....pipeline_manager.analysis_tasks import AnalysisTask
import numpy as np
__author__ = 'andrea tramacere'
[docs]class DeblendedProducts(object):
def __init__(self,parent,children_list,offset_x=0,offset_y=0):
self.parent=parent
self.children_list=children_list
self.offset_x=offset_x
self.offset_y=offset_y
[docs]def build_source_cluster(deblended_cluster_list,parent,offset_x=0,offset_y=0):
src_cl_list=[]
for ID,children_cluster in enumerate(deblended_cluster_list):
children_cluster.position_array=children_cluster.position_array+[offset_x,offset_y]
src_cl_list.append(children_cluster.SourceCluster_factory(parent_ID=parent.ID))
print("->ID,",ID,children_cluster.ID,src_cl_list[ID].cartesian.x.max(),src_cl_list[ID].cartesian.y.max())
return src_cl_list
[docs]def find_direct_children(parent_cluster,children_src_clusters_list):
for cl in children_src_clusters_list:
if parent_cluster.check_is_in_cluster(cl.x_c,cl.y_c)[0] == True:
return cl
return None
[docs]def test_plot(parent_cluster_image,children_cluster_list,cl_off_x,cl_off_y):
from asterism.plotting.plot_tools import plot_contour, show_image
import pylab as plt
image, off_set_x, off_set_y, masked_pixels=parent_cluster_image.get_cluster_image_array()
im, ax = show_image(image)
for ID,cl in enumerate(children_cluster_list):
if cl.contour_shape is not None:
plot_contour(ax, cl.contour_shape[:, 0] -cl_off_x , cl.contour_shape[:, 1]-cl_off_y , 'w')
ax.annotate('ID=%d' % (ID), xy=(cl.x_c - cl_off_x, cl.y_c - cl_off_y ), color='w')
plt.show()
[docs]def validate_children_clusters(parent_cluster,
children_clusters_list,
children_min_frac_integ_flux,
children_min_frac_peak_flux,
children_compact_frac_size,
children_ext_frac_size,
children_bright_frac_peak_flux,
verbose=False):
"""
Returns:
"""
validated_list=[]
orphan_list=[]
parent_flux=np.float(parent_cluster.flux.sum())
parent_peak_flux=np.float(parent_cluster.flux.max())
valid=False
if len(children_clusters_list)<2:
return [],[]
print ("| children clusters found",len(children_clusters_list))
#print([cl.ID for cl in children_clusters_list])
for ID,children_source_cluster in enumerate(children_clusters_list):
frac_size=np.float(children_source_cluster.cl_len)/np.float(parent_cluster.cl_len)
frac_flux=np.float(children_source_cluster.flux.sum())/parent_flux
frac_peak_flux=np.float(children_source_cluster.flux.max())/parent_peak_flux
c=None
#compact
valid=False
if frac_size<=children_compact_frac_size:
c='compact'
if frac_peak_flux>children_min_frac_peak_flux:
valid=True
#extended
if frac_size>=children_ext_frac_size:
c = 'extended'
if frac_flux>children_min_frac_integ_flux:
valid=True
#bright
if frac_peak_flux>children_bright_frac_peak_flux:
c = 'bright'
valid=True
if valid is True:
validated_list.append(children_source_cluster)
else:
orphan_list.append(children_source_cluster)
if verbose is True:
print("|test for subcluster",ID)
print("|children is",c)
print("|frac size",frac_size,children_compact_frac_size,children_ext_frac_size)
print("|frac_flux ",frac_flux,children_min_frac_integ_flux)
print("|frac_peak_flux ",frac_peak_flux,children_min_frac_peak_flux)
print("|valid=",valid)
print("|-")
#print([cl.ID for cl in validated_list])
#print([cl.ID for cl in orphan_list])
return validated_list,orphan_list
[docs]def assign_orphane_clusters(validated_denclue_clusters_list,orphan_list):
"""
Returns:
"""
print ("|merging orphagn")
for orphan in orphan_list:
closest_ot_orphan=orphan.get_closest_cluster(validated_denclue_clusters_list)
closest_ot_orphan.merge_with(orphan)
[docs]def deblend_parent(parent_cluster,
children_cluster_list,
validate_children,
children_min_frac_integ_flux,
children_min_frac_peak_flux,
children_compact_frac_size,
children_ext_frac_size,
children_bright_frac_peak_flux,
offset_x=0,
offset_y=0,
verbose=False,
plot=False):
"""
from the initial list of children are removed the not-valid ones (orphanes)
and the final list of validated children is returned, with the following steps
1) srcs clusters are built from basic clustering clusters
2) the direct children (i.e. with center within parent) is identified
3) all the children except the direct are validated, and the list of children is divide in validated and orphanes
4) if validated are None:
only the direct is returned, or the parent is direct is None
else:
the list of [direct + validated..] is returned
where direct=parent-validated
Parameters
----------
parent_cluster:
deblended_cluster_list:
children_min_frac_integ_flux:
children_min_frac_peak_flux:
children_min_frac_size:
offset_x:
offset_y:
Returns
-------
final_deblended_cluster_list:
"""
validated_cluster_list=[]
orphan_list=[]
children_src_clusters_list=[]
if len(children_cluster_list)>1:
if verbose==True:
print ("| analysing sub clusters for parent cluster",parent_cluster.ID)
children_src_clusters_list=build_source_cluster(children_cluster_list,parent_cluster,offset_x=offset_x,offset_y=offset_y)
if validate_children == True:
direct_children_cluster=find_direct_children(parent_cluster,children_src_clusters_list)
if direct_children_cluster is not None:
children_src_clusters_list.remove(direct_children_cluster)
validated_cluster_list=[direct_children_cluster]
_list,orphan_list= validate_children_clusters(parent_cluster,
children_src_clusters_list,
children_min_frac_integ_flux,
children_min_frac_peak_flux,
children_compact_frac_size,
children_ext_frac_size,
children_bright_frac_peak_flux,
verbose=verbose)
validated_cluster_list.extend(_list)
else:
validated_cluster_list=children_src_clusters_list
orphan_list=[]
else:
validated_cluster_list = [parent_cluster]
if len(validated_cluster_list)>0:
print ("| children clusters validated",len(validated_cluster_list))
if len(orphan_list)>0:
assign_orphane_clusters(validated_cluster_list, orphan_list)
if plot is True:
test_plot(parent_cluster,children_src_clusters_list,offset_x,offset_y)
return validated_cluster_list
[docs]def do_deblending_validation(deblended_prod_list,
validate_children=False,
children_min_frac_integ_flux=0.1,
children_min_frac_peak_flux=0.1,
children_compact_frac_size=0.1,
children_ext_frac_size=0.1,
children_bright_frac_peak_flux=0.1,
verbose=False,
plot=False):
"""
Top level function for the deblending validation.
Parameters
----------
deblended_prod_list
validate_children
children_min_frac_integ_flux
children_min_frac_peak_flux
children_compact_frac_size
children_ext_frac_size
children_bright_frac_peak_flux
verbose
Returns
-------
"""
final_list=[]
for p in deblended_prod_list:
_list=deblend_parent(p.parent,
p.children_list,
validate_children=validate_children,
children_min_frac_integ_flux=children_min_frac_integ_flux,
children_min_frac_peak_flux=children_min_frac_peak_flux,
children_compact_frac_size=children_compact_frac_size,
children_ext_frac_size=children_ext_frac_size,
children_bright_frac_peak_flux=children_bright_frac_peak_flux,
offset_x=p.offset_x,
offset_y=p.offset_y,
verbose=verbose,
plot=plot)
final_list.extend(_list)
CL_ID = 0
for deblended_cluster in final_list:
deblended_cluster.ID = CL_ID
CL_ID += 1
return final_list
[docs]class DoDeblendingValidationTask(AnalysisTask):
def __init__(self, name='deblending_validation', func=do_deblending_validation, parser=None, process=None):
super(DoDeblendingValidationTask,self).__init__(name=name,func=func,parser=parser,process=process)
self.parameters_list.add_par('-validate_children', help='if True, then the resulting children sub-clusters are processed to be validated', action='store_true')
self.parameters_list.add_par('-children_compact_frac_size', type=np.float,
help="""children to parent fractional size in pixels, below which the children is considered *compact* """, default=0.1)
self.parameters_list.add_par('-children_ext_frac_size', type=np.float, help='children to parent fractional size in pixels, above which the children is considered *extended*',
default=0.1)
self.parameters_list.add_par('-children_bright_frac_peak_flux', type=np.float,
help='min. ratio for extended children', default=0.1)
self.parameters_list.add_par('-children_min_frac_integ_flux', type=np.float,
help='min. val of the ratio of children to parent integ. flux, to validate an *extended* source children',
default=0.10)
self.parameters_list.add_par('-children_min_frac_peak_flux', type=np.float,
help='min. ratio of children ot parent peak flux to validate a *compact* source children',
default=0.10)
self.parameters_list.add_par('-verbose', help='set verbosity on', action='store_true')
self.parameters_list.add_par('-plot', help='plot ', action='store_true')
self.func=func
[docs]def do_set_deblending_method(method,):
allowed_methods=DoSetDeblendingMethodTask._get_allowed_methods()
if method not in allowed_methods:
raise RuntimeError('denclue method is %s, but allowed methods are%s'%(method,allowed_methods))
return method
[docs]class DoSetDeblendingMethodTask(AnalysisTask):
def __init__(self, name='set_deblending_method', func=do_set_deblending_method, parser=None, process=None):
super(DoSetDeblendingMethodTask,self).__init__(name=name,func=func,parser=parser,process=process)
self.parameters_list.add_par('-method',type=str, help='allowed methods: %s'%self._get_allowed_methods,default='denclue_watershed_deblending')
self.func=func
@staticmethod
def _get_allowed_methods():
return ['guass_laplace_watershed', 'denclue' ,'denclue_watershed_deblending','no_deblending']