"""
"""
from __future__ import division, absolute_import, print_function
__author__ = 'andrea tramacere'
from scipy import ndimage
from skimage import measure
from astropy.io import fits as pf
import numpy as np
from ...plotting.plot_tools import show_image
[docs]class Image(object):
"""
The Image class is used to represent an image object. Images are converted to floats, and pixel values shifted to have
the min==0. Negative values are used to mask. This is used in rotated images, where pixels that are outside boundaries
are masked
Attributes:
_original_image (ndarray): the ndarray of the original image. This array will not be touched by any transformation.
_shape_x (float): the x shape of the image
_shape_y (float): the y image of the image
"""
def __init__(self,image_array,\
rotation_center_x=None,\
rotation_center_y=None,\
copy_original=True,\
multichannel=False,\
as_gray=True,\
masked=None,\
fill_value=0,
set_min_flux_zero=True):
"""
Parameters
----------
image_array
rotation_center_x
rotation_center_y
copy_original
multichannel
as_gray
masked
fill_value
Returns
-------
"""
if type(image_array)!=np.ndarray:
raise RuntimeError('image_array argument is not a numpy ndarray')
if image_array.ndim<1:
raise RuntimeError('the ndarray is not compatible with an image')
elif image_array.ndim>3:
raise RuntimeError('the ndarray is not compatible with an image')
self._shape=image_array.shape
self.fill_value=fill_value
self._N_channels=1
if image_array.ndim==3 and as_gray==True and image_array.shape[2]>1:
self._data=rgb2gray(image_array)
self._multichannel=False
elif image_array.ndim==2:
self._data=image_array
self._multichannel=False
else:
self._data=image_array
self._multichannel=True
self._N_channels=image_array.ndim[0]
if copy_original==True:
self._original_image=np.copy(self._data)
else:
self._original_image=None
if set_min_flux_zero==True:
if self._N_channels>1:
for i in xrange(self._N_channels):
self._data[i]=self._data[i]-self._data[i].min()
else:
self._data =self._data-self._data.min()
self._shape=self._data.shape
self._shape_y=self._shape[0]
self._shape_x=self._shape[1]
if rotation_center_x is None:
self._rotation_center_x=np.float(self._shape_x)*0.5
if rotation_center_y is None:
self._rotation_center_y=np.float(self._shape_y)*0.5
self._rotation_center=[self._rotation_center_x,self._rotation_center_y]
self._center_x=np.float(self._shape_x)*0.5
self._center_y=np.float(self._shape_y)*0.5
self._rotation_center=[self._center_x,self._center_y]
self._invalid_entry=-1.0
self.masked=masked
if masked is not None:
self._data[masked]=self.fill_value
self._transformed=False
self.build_masked_array()
#def __repr__(self,hide_values=True):
# return self.array
def _update(self):
"""
This methods updates all the image members
Returns:
"""
self._shape=self._data.shape
self._shape_y=self._shape[0]
self._shape_x=self._shape[1]
self._center_x=np.float(self._shape_x)*0.5
self._center_y=np.float(self._shape_y)*0.5
@property
def original_array(self):
"""
Returns:
the ndarray of the original image
"""
return self._original_image
@original_array.deleter
def original_array(self):
del self._original_image
def _check_shape(self):
pass
@property
def rotation_center(self):
return self._rotation_center
@property
def shape(self):
"""
method to get the image shape
Returns:
tuple: shape of the image
"""
return self._data.shape
@property
def array(self,channel=None):
"""
numpy array of the image
Args:
channel:
Returns:
"""
if self._multichannel==True:
return self._data
else:
if channel is None:
return self._data
else:
return self._data[channel]
@property
def valid_flatten(self,channel=None):
"""
flattened numpy array of the image, getting rid
of masked entries
Args:
channel:
Returns:
"""
if self._multichannel==True:
return self._masked_array.compressed()
else:
if channel is None:
return self._masked_array.compressed()
else:
return self._masked_array[channel].compressed()
[docs] def filter(self,image_filter,inplace=True,**kwg):
"""
This method is used to apply the :class:.image_processing.ImageFilter to apply a filter to the image
Paramters
---------
image_filter (:class:.image_processing.ImageFilter):
inplace (boolean): if True, the filtering is applied to the stored image array, otherwise a new image array is returned
Returns
------
"""
#check_ImageFilter(image_filter)
if inplace==True:
self.build_masked_array()
self._data= image_filter.apply(self._masked_array,**kwg)
self._transformed=True
self.build_masked_array()
self._update()
else:
self.build_masked_array()
return self.from_array(image_filter.apply(self._masked_array,**kwg))
def _make_connected_sub_stamps(self,th,min_size):
selected=self._data>th
if self.masked is not None:
selected=np.logical_and(selected,~self.masked)
label_im, nb_labels = measure.label(selected.astype(np.int0), background=0,connectivity=1,return_num=True)
# measure.label sets bkg to -1
# ndimage.find_objects use 0 as bkg
self._connected_sub_stamps=ndimage.find_objects(label_im+1)
self._N_connected_sub_stamps=len(self._connected_sub_stamps)
print("|connected sub stamps built")
print("|connected sub stamp Num",self._N_connected_sub_stamps)
def _get_connected_stamp_and_mask(self,ID,min_size=0):
stamp=self._connected_sub_stamps[ID]
x_offset=int(stamp[1].start)
y_offset=int(stamp[0].start)
dx=np.fabs(stamp[1].stop-stamp[1].start)
dy=np.fabs(stamp[0].stop-stamp[0].start)
sub_array=self._data[stamp]
if dx*dy>min_size:
return sub_array,self.masked[stamp],x_offset,y_offset
else:
return None,None,None,None
[docs] def get_lowest_flux_array_from_sub_stamp(self,sub_block_frac_size=0.1,stack_average_max_id=1,min_frac_size_valid_entries=0.5):
"""
Only sub stamps with min_frac_size_valid_entries are used
Parameters
----------
sub_block_frac_size
stack_average_max_id
Returns
-------
"""
self._make_sub_stamps_grid(sub_block_frac_size)
dt=np.dtype([('ID',np.int32),('flux',np.float32),('row',np.int64),('col',np.int32)])
flux_arr=np.zeros((self._N_sub_stamps_grid),dtype=dt)
ID=0
for ID_r in self._sub_stamp_grid_ID_row:
for ID_c in self._sub_stamp_grid_ID_col:
if float(self._get_grid_sub_stamp_valid_flatten(ID_r,ID_c).size)/float(self._get_grid_sub_stamp(ID_r,ID_c).size)>min_frac_size_valid_entries:
flux_arr[ID]=(ID,self._get_grid_sub_stamp_valid_flatten(ID_r,ID_c).sum() ,self._sub_stamp_grid_ID_row[ID_r],self._sub_stamp_grid_ID_col[ID_c])
else:
flux_arr[ID]['ID']=-1
ID+=1
flux_arr=flux_arr[flux_arr['ID']!=-1]
flux_sorted_id=np.argsort(flux_arr['flux'])
flux_arr=flux_arr[flux_sorted_id]
#get min
r=flux_arr['row'][0]
c=flux_arr['col'][0]
sub_im_flux_array=self._get_grid_sub_stamp_valid_flatten(r,c)
# sum and averages
if stack_average_max_id>1:
stack_average_max_id=max(flux_arr.size,stack_average_max_id)
for sub_i in xrange(1,stack_average_max_id):
sub_im_flux_array=np.append(sub_im_flux_array,self._get_grid_sub_stamp_valid_flatten(flux_arr['row'][sub_i],flux_arr['col'][sub_i]))
return sub_im_flux_array
def _make_sub_stamps_grid(self,sub_block_frac_size):
(y,x)=self.shape
sub_block_size=x*y*sub_block_frac_size
self._sub_stamp_side_size=np.int(np.sqrt(sub_block_size))
if self._sub_stamp_side_size<1:
self._sub_stamp_side_size=1
raise Warning('self._sub_stamp_side_size was lower then 1')
self._sub_stamp_x_grid=np.arange(0,x-self._sub_stamp_side_size,self._sub_stamp_side_size)
self._sub_stamp_y_grid=np.arange(0,y-self._sub_stamp_side_size,self._sub_stamp_side_size)
self._sub_stamp_grid_ID_col=np.arange(self._sub_stamp_x_grid.size)
self._sub_stamp_grid_ID_row=np.arange(self._sub_stamp_y_grid.size)
self._N_sub_stamps_grid=self._sub_stamp_x_grid.size*self._sub_stamp_y_grid.size
print("|sub blocks grid built")
print("|sub block side",self._sub_stamp_side_size)
print("|sub blocks Num",self._N_sub_stamps_grid)
def _get_grid_sub_stamp(self,ID_row,ID_col):
x_r = self._sub_stamp_x_grid[ID_col]
y_r = self._sub_stamp_y_grid[ID_row]
return self._data[y_r:y_r+self._sub_stamp_side_size,x_r:x_r+self._sub_stamp_side_size]
def _get_grid_sub_stamp_valid_flatten(self,ID_row,ID_col):
x_r = self._sub_stamp_x_grid[ID_col]
y_r = self._sub_stamp_y_grid[ID_row]
return self._masked_array[y_r:y_r+self._sub_stamp_side_size,x_r:x_r+self._sub_stamp_side_size].compressed()
[docs] def build_masked_array(self,trim=False):
self._data[np.isnan(self._data)]=np.inf
if self.masked is not None and self._transformed==False:
masked=np.logical_or(self.masked,np.isinf(self._data))
else:
masked=np.isinf(self._data)
if trim==True:
r,c=np.where(~masked)
r1=r.min()
r2=r.max()+1
c1=c.min()
c2=c.max()+1
self._data=self._data[r1:r2,c1:c2]
self.masked=masked[r1:r2,c1:c2]
else:
self.masked=masked
#self._data[self.masked]=self.fill_value
self._masked_array=np.ma.core.MaskedArray(data=self._data,\
mask=self.masked,\
fill_value=self.fill_value)
#def mask_above_th(self,th):
# if type(self._data)==np.ma.core.MaskedArray:
# return self._data>th
# else:
# return self._data>th
[docs] def show(self):
show_image(self._data,plot=True)
@classmethod
[docs] def from_array(cls,data):
if type(data)==np.ma.core.MaskedArray:
return cls(image_array=data.data,masked=data.mask,copy_original=True)
elif type(data)==np.ndarray:
return cls(image_array=data,copy_original=True)
@classmethod
[docs] def from_file(cls,data):
return cls(image_array=data,copy_original=True)
@classmethod
[docs] def from_fits_file(cls,file,ext=0,slice=None):
if slice is None:
image_array=pf.getdata(file,ext=ext)
else:
image_array=pf.getdata(file,ext=ext)[slice]
return cls(image_array=image_array,copy_original=True)
[docs]def test():
from asterism import data_dir
from ..image_processing.filters import GaussianFilter
from ..image_processing.geometry_transformation import RotateImage,MapToCircle
image=Image.from_fits_file(data_dir+'/many_source_field.fits')
print("shape",image.shape)
#gauss_filter=GaussianFilter(sigma=1.0)
#print("with inplace=False, the Image._data has not changed")
#image.filter(gauss_filter,inplace=True)
#image.show()
#print("with inplace=True, the Image._data has changed")
#image.filter(gauss_filter,inplace=True)
#print(image.shape)
#image.show()
#print ("flux mean",image.valid_flatten.mean())
print("rotation")
rotation=RotateImage(angle=15)
#image.geom_transformation(rotation,inplace=True)
#image.show()
map_to_circle=MapToCircle(a=1.5,b=1.0,angle=50)
image.geom_transformation(map_to_circle,inplace=True)
image.show()
#print ("flux mean",image.valid_flatten.mean())
#print(image.shape)
#image.show()
#image.filter(gauss_filter,inplace=True)
#print(image.shape)
for i in xrange(6):
image.geom_transformation(rotation,inplace=True)
#print(image.shape)
image.show()
return image