Source code for asterism.core.image_manager.image

"""

"""
from __future__ import division, absolute_import, print_function

__author__ = 'andrea tramacere'


from scipy import ndimage
from skimage import measure
from astropy.io import  fits as pf
import  numpy as np

from ...plotting.plot_tools import show_image


[docs]class Image(object): """ The Image class is used to represent an image object. Images are converted to floats, and pixel values shifted to have the min==0. Negative values are used to mask. This is used in rotated images, where pixels that are outside boundaries are masked Attributes: _original_image (ndarray): the ndarray of the original image. This array will not be touched by any transformation. _shape_x (float): the x shape of the image _shape_y (float): the y image of the image """ def __init__(self,image_array,\ rotation_center_x=None,\ rotation_center_y=None,\ copy_original=True,\ multichannel=False,\ as_gray=True,\ masked=None,\ fill_value=0, set_min_flux_zero=True): """ Parameters ---------- image_array rotation_center_x rotation_center_y copy_original multichannel as_gray masked fill_value Returns ------- """ if type(image_array)!=np.ndarray: raise RuntimeError('image_array argument is not a numpy ndarray') if image_array.ndim<1: raise RuntimeError('the ndarray is not compatible with an image') elif image_array.ndim>3: raise RuntimeError('the ndarray is not compatible with an image') self._shape=image_array.shape self.fill_value=fill_value self._N_channels=1 if image_array.ndim==3 and as_gray==True and image_array.shape[2]>1: self._data=rgb2gray(image_array) self._multichannel=False elif image_array.ndim==2: self._data=image_array self._multichannel=False else: self._data=image_array self._multichannel=True self._N_channels=image_array.ndim[0] if copy_original==True: self._original_image=np.copy(self._data) else: self._original_image=None if set_min_flux_zero==True: if self._N_channels>1: for i in xrange(self._N_channels): self._data[i]=self._data[i]-self._data[i].min() else: self._data =self._data-self._data.min() self._shape=self._data.shape self._shape_y=self._shape[0] self._shape_x=self._shape[1] if rotation_center_x is None: self._rotation_center_x=np.float(self._shape_x)*0.5 if rotation_center_y is None: self._rotation_center_y=np.float(self._shape_y)*0.5 self._rotation_center=[self._rotation_center_x,self._rotation_center_y] self._center_x=np.float(self._shape_x)*0.5 self._center_y=np.float(self._shape_y)*0.5 self._rotation_center=[self._center_x,self._center_y] self._invalid_entry=-1.0 self.masked=masked if masked is not None: self._data[masked]=self.fill_value self._transformed=False self.build_masked_array() #def __repr__(self,hide_values=True): # return self.array def _update(self): """ This methods updates all the image members Returns: """ self._shape=self._data.shape self._shape_y=self._shape[0] self._shape_x=self._shape[1] self._center_x=np.float(self._shape_x)*0.5 self._center_y=np.float(self._shape_y)*0.5 @property def original_array(self): """ Returns: the ndarray of the original image """ return self._original_image @original_array.deleter def original_array(self): del self._original_image def _check_shape(self): pass @property def rotation_center(self): return self._rotation_center @property def shape(self): """ method to get the image shape Returns: tuple: shape of the image """ return self._data.shape @property def array(self,channel=None): """ numpy array of the image Args: channel: Returns: """ if self._multichannel==True: return self._data else: if channel is None: return self._data else: return self._data[channel] @property def valid_flatten(self,channel=None): """ flattened numpy array of the image, getting rid of masked entries Args: channel: Returns: """ if self._multichannel==True: return self._masked_array.compressed() else: if channel is None: return self._masked_array.compressed() else: return self._masked_array[channel].compressed()
[docs] def filter(self,image_filter,inplace=True,**kwg): """ This method is used to apply the :class:.image_processing.ImageFilter to apply a filter to the image Paramters --------- image_filter (:class:.image_processing.ImageFilter): inplace (boolean): if True, the filtering is applied to the stored image array, otherwise a new image array is returned Returns ------ """ #check_ImageFilter(image_filter) if inplace==True: self.build_masked_array() self._data= image_filter.apply(self._masked_array,**kwg) self._transformed=True self.build_masked_array() self._update() else: self.build_masked_array() return self.from_array(image_filter.apply(self._masked_array,**kwg))
[docs] def geom_transformation(self,transformation,inplace=False,**kwg): #check_ImageTransformation(transformation) if inplace==True: self.build_masked_array() self._data= transformation.apply(self,fill=self._invalid_entry,**kwg) self._transformed=True self.build_masked_array() self._update() else: self.build_masked_array() return transformation.apply(self._masked_array,fill=self._invalid_entry,**kwg)
def _make_connected_sub_stamps(self,th,min_size): selected=self._data>th if self.masked is not None: selected=np.logical_and(selected,~self.masked) label_im, nb_labels = measure.label(selected.astype(np.int0), background=0,connectivity=1,return_num=True) # measure.label sets bkg to -1 # ndimage.find_objects use 0 as bkg self._connected_sub_stamps=ndimage.find_objects(label_im+1) self._N_connected_sub_stamps=len(self._connected_sub_stamps) print("|connected sub stamps built") print("|connected sub stamp Num",self._N_connected_sub_stamps) def _get_connected_stamp_and_mask(self,ID,min_size=0): stamp=self._connected_sub_stamps[ID] x_offset=int(stamp[1].start) y_offset=int(stamp[0].start) dx=np.fabs(stamp[1].stop-stamp[1].start) dy=np.fabs(stamp[0].stop-stamp[0].start) sub_array=self._data[stamp] if dx*dy>min_size: return sub_array,self.masked[stamp],x_offset,y_offset else: return None,None,None,None
[docs] def get_lowest_flux_array_from_sub_stamp(self,sub_block_frac_size=0.1,stack_average_max_id=1,min_frac_size_valid_entries=0.5): """ Only sub stamps with min_frac_size_valid_entries are used Parameters ---------- sub_block_frac_size stack_average_max_id Returns ------- """ self._make_sub_stamps_grid(sub_block_frac_size) dt=np.dtype([('ID',np.int32),('flux',np.float32),('row',np.int64),('col',np.int32)]) flux_arr=np.zeros((self._N_sub_stamps_grid),dtype=dt) ID=0 for ID_r in self._sub_stamp_grid_ID_row: for ID_c in self._sub_stamp_grid_ID_col: if float(self._get_grid_sub_stamp_valid_flatten(ID_r,ID_c).size)/float(self._get_grid_sub_stamp(ID_r,ID_c).size)>min_frac_size_valid_entries: flux_arr[ID]=(ID,self._get_grid_sub_stamp_valid_flatten(ID_r,ID_c).sum() ,self._sub_stamp_grid_ID_row[ID_r],self._sub_stamp_grid_ID_col[ID_c]) else: flux_arr[ID]['ID']=-1 ID+=1 flux_arr=flux_arr[flux_arr['ID']!=-1] flux_sorted_id=np.argsort(flux_arr['flux']) flux_arr=flux_arr[flux_sorted_id] #get min r=flux_arr['row'][0] c=flux_arr['col'][0] sub_im_flux_array=self._get_grid_sub_stamp_valid_flatten(r,c) # sum and averages if stack_average_max_id>1: stack_average_max_id=max(flux_arr.size,stack_average_max_id) for sub_i in xrange(1,stack_average_max_id): sub_im_flux_array=np.append(sub_im_flux_array,self._get_grid_sub_stamp_valid_flatten(flux_arr['row'][sub_i],flux_arr['col'][sub_i])) return sub_im_flux_array
def _make_sub_stamps_grid(self,sub_block_frac_size): (y,x)=self.shape sub_block_size=x*y*sub_block_frac_size self._sub_stamp_side_size=np.int(np.sqrt(sub_block_size)) if self._sub_stamp_side_size<1: self._sub_stamp_side_size=1 raise Warning('self._sub_stamp_side_size was lower then 1') self._sub_stamp_x_grid=np.arange(0,x-self._sub_stamp_side_size,self._sub_stamp_side_size) self._sub_stamp_y_grid=np.arange(0,y-self._sub_stamp_side_size,self._sub_stamp_side_size) self._sub_stamp_grid_ID_col=np.arange(self._sub_stamp_x_grid.size) self._sub_stamp_grid_ID_row=np.arange(self._sub_stamp_y_grid.size) self._N_sub_stamps_grid=self._sub_stamp_x_grid.size*self._sub_stamp_y_grid.size print("|sub blocks grid built") print("|sub block side",self._sub_stamp_side_size) print("|sub blocks Num",self._N_sub_stamps_grid) def _get_grid_sub_stamp(self,ID_row,ID_col): x_r = self._sub_stamp_x_grid[ID_col] y_r = self._sub_stamp_y_grid[ID_row] return self._data[y_r:y_r+self._sub_stamp_side_size,x_r:x_r+self._sub_stamp_side_size] def _get_grid_sub_stamp_valid_flatten(self,ID_row,ID_col): x_r = self._sub_stamp_x_grid[ID_col] y_r = self._sub_stamp_y_grid[ID_row] return self._masked_array[y_r:y_r+self._sub_stamp_side_size,x_r:x_r+self._sub_stamp_side_size].compressed()
[docs] def build_masked_array(self,trim=False): self._data[np.isnan(self._data)]=np.inf if self.masked is not None and self._transformed==False: masked=np.logical_or(self.masked,np.isinf(self._data)) else: masked=np.isinf(self._data) if trim==True: r,c=np.where(~masked) r1=r.min() r2=r.max()+1 c1=c.min() c2=c.max()+1 self._data=self._data[r1:r2,c1:c2] self.masked=masked[r1:r2,c1:c2] else: self.masked=masked #self._data[self.masked]=self.fill_value self._masked_array=np.ma.core.MaskedArray(data=self._data,\ mask=self.masked,\ fill_value=self.fill_value)
#def mask_above_th(self,th): # if type(self._data)==np.ma.core.MaskedArray: # return self._data>th # else: # return self._data>th
[docs] def show(self): show_image(self._data,plot=True)
@classmethod
[docs] def from_array(cls,data): if type(data)==np.ma.core.MaskedArray: return cls(image_array=data.data,masked=data.mask,copy_original=True) elif type(data)==np.ndarray: return cls(image_array=data,copy_original=True)
@classmethod
[docs] def from_file(cls,data): return cls(image_array=data,copy_original=True)
@classmethod
[docs] def from_fits_file(cls,file,ext=0,slice=None): if slice is None: image_array=pf.getdata(file,ext=ext) else: image_array=pf.getdata(file,ext=ext)[slice] return cls(image_array=image_array,copy_original=True)
[docs]def test(): from asterism import data_dir from ..image_processing.filters import GaussianFilter from ..image_processing.geometry_transformation import RotateImage,MapToCircle image=Image.from_fits_file(data_dir+'/many_source_field.fits') print("shape",image.shape) #gauss_filter=GaussianFilter(sigma=1.0) #print("with inplace=False, the Image._data has not changed") #image.filter(gauss_filter,inplace=True) #image.show() #print("with inplace=True, the Image._data has changed") #image.filter(gauss_filter,inplace=True) #print(image.shape) #image.show() #print ("flux mean",image.valid_flatten.mean()) print("rotation") rotation=RotateImage(angle=15) #image.geom_transformation(rotation,inplace=True) #image.show() map_to_circle=MapToCircle(a=1.5,b=1.0,angle=50) image.geom_transformation(map_to_circle,inplace=True) image.show() #print ("flux mean",image.valid_flatten.mean()) #print(image.shape) #image.show() #image.filter(gauss_filter,inplace=True) #print(image.shape) for i in xrange(6): image.geom_transformation(rotation,inplace=True) #print(image.shape) image.show() return image