Source code for mascara.funcs.utils

# Copyright 2014 - 2015 Anna-Lea Lesage for MASCARA
#    This file is part of the mascara package.
#    mascara is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#    mascara is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    GNU General Public License for more details.
#    You should have received a copy of the GNU General Public License
#    along with mascara.  If not, see <>.

.. module:: utils
.. moduleauthor:: Anna-Lea Lesage  
Utilities functions used for MASCARA

    - rotmat, create a 3x3 (xN) rotation matrix along a specific axis
    - pol2rect, convert polar to rectangular coordinates
    - rect2pol, convert rectangular to polar coordinates
    - percentile, compute the Nth percentile of a distribution
    - makevortex, make Delauney vortices
    - match_vortex, match vortices together
    - loadStellarCatalog,
    - cleanupStellarCatalog -> remove blended stars ...
    - find_outliers, 
    - robuststd, computes the standard deviation of a distribution
    - rebin, rebins a 1D or 2D array into a new shape
    - statbin, rebins and provides statistics
    - gaussian, make a guassian
    - gfitting, fit a gaussian

__version__ = '15.10.12'

import numpy as np
import logging

[docs]class QueueHandler(logging.Handler): ''' This class is new in python 3.2 and later but has been copied here for python 2.7. It sends a logging handler to a Queue. It would be used together with a multiprocessing queue to centralise logging to file in a multi-processing configuration and to avoid write contention between processes. ''' import Queue as queue
[docs] def __init__(self, queue): ''' Initialise the instance using the passed queue. ''' logging.Handler.__init__(self) self.queue = queue
[docs] def enqueue(self, record): ''' Enqueue a record. This implementation uses the no_wait : event are put to queue as soon as possible . ''' self.queue.put_nowait(record)
[docs] def prepare(self, record): ''' Prepares a record for queueing. The object returned by this method is enqueued. ''' self.format(record) record.msg = record.message record.args = None record.exc_info = None return record
[docs] def emit(self, record): ''' Emits a record. Writes the Logrecord to queue, preparing it first. ''' try: self.enqueue(self.prepare(record)) except (KeyboardInterrupt, SystemExit): raise except: self.HandlerError(record)
[docs]class QueueListener(object): ''' Queue listener works in pair with QueueHandler for writing/printing log files using logging in the case of multi-processes. This class implements an internal thread listener which watches for LogRecords being added to Queue, removes them and passes them a list of handler for processing. ''' _sentinel = None
[docs] def __init__(self, queue, *handlers): ''' initialises an instance with the specified queue and handlers ''' import threading self.queue = queue self.handlers = handlers self._stop = threading.Event() self._thread = None
[docs] def dequeue(self, block): ''' dequeue a record, and optionally block it ''' return self.queue.get(block)
[docs] def start(self): ''' Start the listener... This starts up a background thread to monitor the queue for LogRecords to process. ''' import threading self._thread = t = threading.Thread(target = self._monitor) t. setDeamon = True t.start()
[docs] def prepare(self, record): ''' prepares a record for handling. ''' return record
[docs] def handle(self, record): ''' Handle a record. This just loops through the handlers offering them the record to handle. ''' record = self.prepare(record) for handler in self.handlers: handler.handle(record)
[docs] def _monitor(self): ''' Monitors the queue for records and ask the handler to deal with them. This method run on a separate internal thread. The thread will terminate once it sees the sentinel object in the queue. ''' import Queue as queue q = self.queue has_task_done = hasattr(q, 'task_done') while not self._stop.isSet(): try: record = self.dequeue(True) if record is self._sentinel: break self.handle(record) if has_task_done: q.task_done() except queue.Empty: pass # there may still be some records in the queue to extract. while True: try: record = self.dequeue(False) if record is self._sentinel: break self.handle(record) if has_task_done: q.task_done() except queue.Empty: break
[docs] def stop(self): ''' stop the listener. This asks the thread to terminate and wait till it does so. Note that if we don't call it before the application exits, there may still be some records inside the queue. ''' self._stop.set() self.queue.put_nowait(self._sentinel) self._thread.join() self._thread = None
[docs]class Stacker: ''' Stacker creates a stacker instance which is used to stack images on top of each other in order to bin them temporally. The stacker first align the images before stacking them. In order to do so, it requires valid site and camera instances as inputs. '''
[docs] def __init__(self, site, camera, **kwargs): ''' Initialise the Stacker Class. Inputs: - site, a Site instance - camera, a CamCoord instance Keywords: - imageshape, provide here a tuple with the image size to use if not provided, the stacker will use the image size definition from the camera instance. - sidex, sidey, two integer values to define the size of the boxes which will be moved around to realign the images. Large sidex-sidey make the stacking fast, but sensible to camera aberrations. - oversize, an integer defining the oversize factor to use while aligning the image. If the factor is too small, there is a risk of aligning outside the image deifinition frame. ''' = site = camera imageshape = kwargs.get('imageshape', None) if imageshape: self.ny, self.nx = imageshape else: self.nx = self.ny = self.sidex = kwargs.get('sidex', 32) self.sidey = kwargs.get('sidey', 32) self.oversize = kwargs.get('oversize', 100) self.texp = 0. self.makeGrid() self.makeStackingImages() return
def end(self): ''' ends the stacker class. Remove everything from memory and so on ''' self.Bimage = 0 self.referenceImage = 0 self.Nimages = 0 return
[docs] def makeGrid(self): ''' makes the grid used for rotating the images ''' gx, gy = [],[] self.tmpx = self.nx//self.sidex self.tmpy = self.ny//self.sidey resx = self.nx % self.sidex resy = self.ny % self.sidey if (resx%2 > 0) | (resy%2 > 0): print 'Uneven number of discarded pixels, please adjust patch size.' exit() gx = np.arange(self.tmpx)*self.sidex + self.sidex/2. + resx/2. gy = np.arange(self.tmpy)*self.sidey + self.sidey/2. + resy/2. gx, gy = np.meshgrid(gx, gy) self.gx = gx.ravel() = gy.ravel() self.onx = self.tmpx*self.sidex+self.oversize self.ony = self.tmpy*self.sidey+self.oversize return
def makeStackingImages(self): ## Create output matrix to be filled self.Bimage = np.zeros((self.ony,self.onx), dtype=np.float64) self.referenceImage = np.zeros((self.ony, self.onx), dtype = int) self.Nimages=0 return
[docs] def image_rotate(self, image, currenttime, TimeReference, exposuretime = False): ''' "rotate" image to new reference frame. The image is actually not rotated, but shifted in pieces to the new location for the given TimeReference. Inputs: - image, the image to 'rotate' - currenttime, when this image was taken. Currenttime can be either a datetime object or JD date - TimeReference, the new time reference for 'rotating' the image Keywords: - exposureTime, default false, else precise the exposure time of the image Outputs: - cxp, cyp, the grid coordinates at the new ReferenceTime ''' from itertools import izip phobs, thobs =, talt, taz =, thobs) tra, tec =, taz, currenttime) nalt, naz, nvis =, tec, TimeReference, \ nutation=False, precession=False, refraction=False, aberration=False) phre, thre =, naz)[0:2] cxp, cyp =, thre) cx, cy = np.around(cxp), np.around(cyp) ones = np.ones(image.shape) for i, j, k, l in izip(, self.gx, cy, cx): try: self.Bimage[self.oversize/2+k-self.sidey/2:self.oversize/2+k+self.sidey/2+1, \ self.oversize/2+l-self.sidex/2:self.oversize/2+l+self.sidex/2+1] \ += image[i-self.sidey/2:i+self.sidey/2+1,j-self.sidex/2:j+self.sidex/2+1] self.referenceImage[self.oversize/2+k-self.sidey/2:self.oversize/2+k+self.sidey/2+1, \ self.oversize/2+l-self.sidex/2:self.oversize/2+l+self.sidex/2+1] \ += ones[i-self.sidey/2:i+self.sidey/2+1,j-self.sidex/2:j+self.sidex/2+1] except ValueError: continue if exposuretime: if self.Nimages==0: self.texp = 0 else: self.texp += exposuretime self.Nimages+=1 ### release the memory by deleting the intermediates products del talt, taz, tra, tec, nalt, naz, nvis, phre, thre, phobs, thobs, cx, cy return cxp, cyp
[docs] def binImage(self, endtimestamp): ''' Bin the actual image Input: - endtimestamp, a datetime object or JD date, which is used to marked the end of this binning sequence Outputs: - binimage, the binned image - nimages, the number of images used for binning ''' output = self.Bimage[self.oversize/2:self.oversize/2+self.ny, \ self.oversize/2:self.oversize/2+self.nx] reference = self.referenceImage[self.oversize/2:self.oversize/2+self.ny, \ self.oversize/2:self.oversize/2+self.nx] output = output/reference self.endtimestamp = endtimestamp Nimages = self.Nimages self.makeStackingImages() output = np.nan_to_num(output) if Nimages>0: self.texp = self.texp/Nimages return output, Nimages ### ---------- Astrometric Conversion functions --------------###
[docs]def pol2rect(ra, dec, degree = True): ''' Convert any type of spherical coordinates in a matrix of cartesian rectangular coordinates to be used for coordinates conversion. Example: To convert (ra-dec)(J2000) to (ra-dec)(to date): >>> X = pol2rect(ra, dec) >>> Xnew = N*P*X # where N and P are the nutation and precession matrices. >>> ranew, decnew = rec2pol(Xnew) Inputs: - the 2 polar coordinates (ra,dec) - (alt-az) (phi theta) in DEGREE Output: -a matrix of cartesian coordinates ''' from mascara.constants import deg2rad #Convert the inputs into arrays: ra = np.array(ra) dec= np.array(dec) if degree: C = np.cos(dec*deg2rad) x = np.cos(ra*deg2rad) * C y = np.sin(ra*deg2rad) * C z = np.sin(dec*deg2rad) return np.matrix([x,y,z]) else: C = np.cos(dec) x = np.cos(ra)*C y = np.sin(ra)*C z = np.sin(dec) return np.matrix([x,y,z])
[docs]def rect2pol(X, Eq2AltAz=False): '''Conversion from rectangular coordinates to equatorial. Special care has to be taken around the celestial poles Inputs: - x, y, z, scalar or vector of rectangular coordinates Outputs: - ra, dec: the converted coordinates in degrees ''' from mascara.constants import rad2deg if X.size/len(X) == 1: x = np.ravel(X[0]) y = np.ravel(X[1]) z = np.ravel(X[2]) else: x = np.array(np.ravel(X[0,:]), dtype=float) y = np.array(np.ravel(X[1,:]), dtype=float) z = np.array(np.ravel(X[2,:]), dtype=float) if Eq2AltAz: r = np.sqrt(x**2 + y**2) ra = np.arctan2(y,x) dec = np.arctan2(z,r) else: r = np.sqrt(x**2 + y**2 + z**2) ra = np.arctan2(y,x) dec = np.arcsin(z, r) # Convert the angles back in degree ra = ra*rad2deg dec = dec*rad2deg return ra % 360, dec
[docs]def rotmat(angle, axis='x', degree=False): # TESTED WORKING ''' Rotmat generates a 3x3 rotation matrix for a given angle around the requested axis (x, y, or z) Input: - angle, the rotation angle Keywords: - the axis around which the rotation takes place. - degree=True if the input angles are in degree Output: - the rotation matrix ''' if degree: angle = np.radians(angle) if axis == 'x': s = np.sin(angle) c = np.cos(angle) if np.size(angle) == 1: R = np.matrix([[1, 0, 0],[0, c, s],[0, -s, c]]) else: R = np.zeros((3,3,np.size(angle))) R[0,0,:] = np.ones(np.size(angle)) R[1:,1:,:] = np.array([[c, s], [-s,c]]) return R elif axis == 'y': s = np.sin(angle) c = np.cos(angle) if np.size(c) == 1: R = np.matrix(((c,0,-s),(0,1,0),(s,0,c))) else: R = np.zeros((3,3,np.size(c))) R[0,0,:] = c R[2,0,:] = s R[0,2,:] = -s R[2,2,:] = c return R elif axis == 'z': s = np.sin(angle) c = np.cos(angle) if np.size(c) ==1: R = np.matrix(((c,s,0),(-s,c,0),(0,0,1))) else: R = np.zeros((3,3,np.size(c))) R[-1,-1,:] = np.ones(np.size(c)) R[:2,:2,:] = np.array([[c,s],[-s,c]]) return R else: raise SyntaxError('Give an axis for the rotation matrix')
[docs]def makevortex(x, y): ''' Makevortex uses a Delauney griding method to create from the input coordinates a triangular grid. Each star is matched with its closest neighboor to form a triangle. The method is used in the astrometric solution for identifying the stars together. Inputs: - x, y, 2 vectors of equal size representing the coordinates of the star. Output: - dist, a vector of the possible distances. - index, a array containing the indices of the 2 points forming the distance. ''' from scipy.spatial import Delaunay #rearrange the input coordinates: coords = np.array([x, y]) coords = coords.transpose() tri = Delaunay(coords) dist = [] for ii, jj, kk in tri.simplices: xx = tri.points[ii] yy = tri.points[jj] zz = tri.points[kk] dxy = np.sqrt((xx[0]-yy[0])**2+(xx[1]-yy[1])**2) dyz = np.sqrt((yy[0]-zz[0])**2+(yy[1]-zz[1])**2) dzx = np.sqrt((xx[0]-zz[0])**2+(xx[1]-zz[1])**2) dist.append([dxy,dyz, dzx]) return np.array(dist), tri.simplices
[docs]def match_vortex(rxobs, ryobs, rmobs, rxpre, rypre, rmpre, dtol=5., mtol=0.12): ''' match_vortex matches together observed vortices made of the measured stellar positions to the vortices made from the calculated stellar positions. Inputs: - distobs, a vector containing all the distances computed from the 3 to 6 points. - distpre, a vector containing all the computed predicted distances - indices, an indice vector for distpre (which points makes which distance) - tol, the tolerance for matching the distance together. ''' ### Calibrate the magnitudes: rmobs = rmobs - np.median(rmobs)+np.median(rmpre) ### Create now the Griding of the stars: distobs, indexobs = makevortex(rxobs, ryobs) distpre, indexpre = makevortex(rxpre, rypre) rmpre = rmpre[indexpre] ##### ----- MATCHING -----##### ### What is the number of triangles fed to the function: Ntri = distobs.shape[0] ### Sort the distances and use it to sort the indexes too init = np.argsort(distobs, axis=1) nexit = np.sort(distpre, axis=1) valid = np.array([], dtype=int) ### and loop over the observational data for N in range(Ntri): ## 1. Put the observations in good shape, sort, and make ratios dobs = distobs[N, init[N, :]] tmpobs = np.sort(rmobs[indexobs[N, init[N, :]]])[::-1] tmpobs = tmpobs / np.nanmin(tmpobs) ### Min since the values are mostly negative! ## 2. Compute the distance differences? or ratios? ddist = np.abs(nexit - dobs) ## 3. Candidates: dist_cand = (ddist < dtol).mean(axis=1) == 1 mag_cand = rmpre[dist_cand,:] ## 4. Do the ratios of the magnitude and compare the ratios mmaxes = np.meshgrid(np.ones(3), np.nanmax(mag_cand, axis=1))[1] mag_cand = np.sort(mag_cand / mmaxes, axis=1) dmag = mag_cand - tmpobs ## 5. Select the vortexes where the difference in magnitude ratio is smaller ## than mtol mag_cand = ((np.abs(dmag) < mtol).mean(axis=1) == 1.).nonzero()[0] ### 6. Fill the valid array valid_cand = dist_cand.nonzero()[0][mag_cand] a = np.meshgrid(indexobs[N,:], valid_cand)[0] if valid.size == 0: valid = np.dstack((a, indexpre[valid_cand,:])) else: valid = np.vstack((valid, np.dstack((a, indexpre[valid_cand,:])))) print ' Found {} potential good candidates'.format(valid.shape[0]) ### Keep the good triangles ### What is the distance between the matching triangles? if valid.shape[0] > 5: gxobs = np.mean((rxobs[valid[:,:,0]]+np.roll(rxobs[valid[:,:,0]],1,axis=1))/2, axis=1) gyobs = np.mean((ryobs[valid[:,:,0]]+np.roll(ryobs[valid[:,:,0]],1,axis=1))/2, axis=1) gxpre = np.mean((rxpre[valid[:,:,1]]+np.roll(rxpre[valid[:,:,1]],1,axis=1))/2, axis=1) gypre = np.mean((rypre[valid[:,:,1]]+np.roll(rypre[valid[:,:,1]],1,axis=1))/2, axis=1) tridist = np.sqrt((gxobs-gxpre)**2+(gyobs-gypre)**2) occurence, bins = np.histogram(tridist, bins=4) ### We have the assumption that the good matching triangles are all at the same distance: if np.max(occurence) > tridist.size * 0.33: maxoc = np.argmax(occurence) mostdist = tridist < bins[maxoc+1] validated = valid[mostdist,:,:] else: validated = np.array([]) ## 6. We need about 3 vortexes in total to solve the pointing if validated.shape[0] > 3.: return validated else: return np.array([]) else: return np.array([])
[docs]def loadStellarCatalog(**kwargs): ''' loadStellarCatalog load in memory a binary fits table catalog of stellar names and coordinates for further use (for instance predict the visible stars from an observing site). Keywords: - filename: the fits file containing the stellar catalog. By default it's a vizier catalog containing only stellar coordinates. But any other fits.catalog could be loaded. - Vmag, Rmag, Bmag or Jmag, the magnitude range of the stars to load. Default is set from Vmag= 2 to 8, to avoid using too much memory. But any fainter number is fine too. - sID = 'ASCC', 'TYC', 'HIP' or 'HD', the stellar identification - sIDonly, boelan to return only the stellar identifiers. Outputs: - ra, an array of right ascension coordinates, in degree, and in J2000 - dec, an array of declination - vmag, the corresponding stellar visible magnitude - bmag, the B magnitude - sID, the stellar Identifiers - stype, the spectral type ''' import pyfits as pf filename = kwargs.pop('filename', 'StarCat.fits') if 'Rmag' in kwargs: maglimit = kwargs.get('Rmag', None) elif 'Bmag' in kwargs: maglimit = kwargs.get('Bmag', None) elif 'Jmag' in kwargs: maglimit = kwargs.get('Jmag', None) else: maglimit = kwargs.get('Vmag', (-2., 8.)) try: totcat = pf.getdata(filename, ext=1) except IOError: print 'No fits catalog found ... ' return tmag = totcat['Vmag'] mrange = (tmag >= maglimit[0])*(tmag <= maglimit[1]) if 'sIDonly' in kwargs: if kwargs.get('sID', 'ASCC') == 'TYC': tyc1 = totcat['TYC1'][mrange] tyc2 = totcat['TYC2'][mrange] tyc3 = totcat['TYC3'][mrange] sID = np.array(["".join(["TYC","%d"%i,"%d"%j,"%d" %e]) \ for i,j,e in zip(tyc1,tyc2,tyc3)]) elif kwargs.get('sID', 'ASCC') == 'HD': sID = totcat['HD'][mrange] elif kwargs.get('sID','ASCCC') == 'HIP': sID = totcat['HIP'][mrange] else: sID = totcat['ASCC'][mrange] return sID else: ra = totcat['_RAJ2000'][mrange] dec = totcat['_DEJ2000'][mrange] vmag = tmag[mrange] bmag = totcat['bmag'][mrange] Sptype = totcat['SPTYPE'][mrange] if kwargs.get('sID', 'ASCC') == 'TYC': tyc1 = totcat['TYC1'][mrange] tyc2 = totcat['TYC2'][mrange] tyc3 = totcat['TYC3'][mrange] sID = np.array(["".join(["TYC","%d"%i,"%d"%j,"%d" %e]) \ for i,j,e in zip(tyc1,tyc2,tyc3)]) elif kwargs.get('sID', 'ASCC') == 'HD': sID = totcat['HD'][mrange] elif kwargs.get('sID','ASCCC') == 'HIP': sID = totcat['HIP'][mrange] else: sID = totcat['ASCC'][mrange] return ra, dec, vmag, bmag, sID, Sptype
def selectClosest(ra, dec, mag, start, stop, dist=2.5, degree=True): ''' To select the closest stars''' to_reject = [] flagBlended = np.zeros(ra.size, dtype=int) BlendingValue = np.zeros(ra.size) for k in xrange(start, stop): dra = np.abs(ra - np.roll(ra, k)) ddec = np.abs(dec - np.roll(dec, k)) if degree: closest = ((dra < dist/60.)*(ddec < dist/60.)).nonzero()[0] else: closest = ((dra < dist)*(ddec < dist)).nonzero()[0] for j in closest: ## Mark the indice of the stars to rejects (always the faintest of 2) ## and Flag the other star if mag[j] > mag[j-k]: to_reject.append(j) flagBlended[j-k] = 1 BlendingValue[j-k] = 10**((mag[j-k]-mag[j])/2.5) else: to_reject.append(np.abs(j-k)) flagBlended[j] = 1 BlendingValue[j] = (10**((mag[j]-mag[j-k])/2.5)) return to_reject, flagBlended, BlendingValue
[docs]def cleanupStellarCatalog(ra, dec, mag, dist=2.5): ''' Astrostars is a short routine for selecting stars within a catalog that are suited for astrometry. They are bright (limiting magnitude defined via mag1), and have no close neighbours (define close via dist) of magnitude down to mag2. Inputs: - ra, dec, the stellar coordinates - mag, the corresponding stellar magnitudes. Used for rejected the fainter stars Keyword: - dist = 2.5, the distance in arcmin between 2 stars for considering them blended Outputs: - ra, dec mag of the selected stars ''' ### Make a quick test to check whether the ra is in degree or hours: if ra[-1] > 24: degree=True else: degree=False to_reject = [] flagBlended = np.zeros(ra.size, dtype=int) BlendingValue = np.zeros(ra.size) if ra.size > 10000: print ' The size of the catalog is important: turning multiprocessing mode on...' print ' It may takes several minutes to complete ... ' import multiprocessing NCPU = multiprocessing.cpu_count()-1 start = 1 stop = int(round(ra.size/2.+1)) steps = range(start, stop, 2000) steps.append(stop) pool = multiprocessing.Pool(processes=NCPU) result = [pool.apply_async(selectClosest, args=(ra, dec, mag, i, j )) \ for i,j in zip(steps[0:-1], steps[1:])] for r in result: output = r.get() tr, fg, bd = output to_reject.append(tr) flagBlended += fg BlendingValue += bd pool.close() pool.join() to_reject = [item for sublist in to_reject for item in sublist] flagBlended = np.asarray(flagBlended, dtype=str) flagBlended[flagBlended == '1'] = 'B' flagBlended[flagBlended == '0'] = '' else: flagBlended = np.zeros(ra.size, dtype=str) for k in xrange(1, ra.size-1): dra = np.abs(ra - np.roll(ra, k)) ddec = np.abs(dec - np.roll(dec, k)) if degree: closest = ((dra < dist/60.)*(ddec < dist/60.)).nonzero()[0] else: closest = ((dra < dist)*(ddec < dist/60.)).nonzero()[0] for j in closest: ## Mark the indice of the stars to rejects (always the faintest of 2) ## and Flag the other star if mag[j] > mag[j-k]: to_reject.append(j) flagBlended[j-k] = 'B' BlendingValue[j-k] = 10**((mag[j-k]-mag[j])/2.5) else: to_reject.append(np.abs(j-k)) flagBlended[j] = 'B' BlendingValue[j] = (10**((mag[j]-mag[j-k])/2.5)) to_reject = np.asarray(to_reject) to_reject = np.unique(to_reject) print 'Rejecting %g stars because of blending ' %(to_reject.size) ra = np.delete(ra, to_reject) dec = np.delete(dec, to_reject) mag = np.delete(mag, to_reject) flagBlended = np.delete(flagBlended, to_reject) BlendingValue = np.delete(BlendingValue, to_reject) return ra, dec, mag, to_reject, flagBlended, BlendingValue
[docs]def skyQuad(im, tilesize = 300, aligned=True): ''' skyQuad does nothing with the image itself. Creates tiles for further sky quality analysis. Inputs: - the image itself, to draw out the image dimensions Keywords - tilesize, the size of the tiles. - aligned = True. If True the tiles follow a classical grid pattern. Else the tiles are shifted by half a tile for each row. Outputs: - lx, ly, upx, upy, the corners of the sky Quads. ''' ny, nx = im.shape cx, cy = nx/2, ny/2 if isinstance(tilesize, int): tilesizex = tilesize tilesizey = tilesize else: tilesizex, tilesizey = tilesize tileXnumber = nx/tilesizex tileYnumber = ny/tilesizey lx, ly = [],[] if aligned: for k in np.arange(-tileYnumber/2-1, tileYnumber/2+1): if ((cy+tilesizey/2+k*tilesizey<ny) and (cy+tilesizey/2+k*tilesizey>0)): lx.append([cx+tilesizex/2+tilesizex*j for j in np.arange(-tileXnumber/2, tileXnumber/2+1)\ if (cx+tilesizex/2+(j)*tilesizex<nx)and(cx+tilesizex/2+j*tilesizex>0)]) ly.append([cy+tilesizey/2+tilesizey*k for j in np.arange(-tileXnumber/2, tileXnumber/2+1)\ if (cx+tilesizex/2+(j)*tilesizex<nx)and(cx+tilesizex/2+j*tilesizex>0)]) else: for k in np.arange(-tileYnumber/2-1, tileYnumber/2+1): if ((cy+tilesizey/2+(k+1)*tilesizey<ny) and (cy+tilesizey/2+k*tilesizey>0)): if k%2 == 0: lx.append([cx+tilesizex*j for j in np.arange(-tileXnumber/2, tileXnumber/2+1) \ if (cx+(j+1)*tilesizex<nx)and(cx+j*tilesizex>0)]) ly.append([cy+tilesizey/2+tilesizey*k for j in np.arange(-tileXnumber/2, tileXnumber/2+1) \ if (cx+(j+1)*tilesizex<nx)and(cx+j*tilesizex>0)]) if k%2==1: lx.append([cx+tilesizex/2+tilesizex*j for j in np.arange(-tileXnumber/2, tileXnumber/2+1)\ if (cx+tilesizex/2+(j+1)*tilesizex<nx)and(cx+tilesizex/2+j*tilesizex>0)]) ly.append([cy+tilesizey/2+tilesizey*k for j in np.arange(-tileXnumber/2, tileXnumber/2+1)\ if (cx+tilesizex/2+(j+1)*tilesizex<nx)and(cx+tilesizex/2+j*tilesizex>0)]) lx = [item for sublist in lx for item in sublist] ly = [item for sublist in ly for item in sublist] lx = np.array(lx).ravel() ly = np.array(ly).ravel() upx = lx+tilesizex upy = ly+tilesizey return lx, ly, upx, upy
[docs]def percentile(im, value): ''' Percentile computes the qth percentile of the data. It is based on the numpy version percentile, however without any interpolation of the numbers. And is faster than the scipy.scoreatpercentile() function. Input: - im, a 2d or 1d array - value, a scalar for the percentile to calculate. Must be between 0 and 1. Output: - the value of the data at the percentile. ''' tmp = np.ravel(im) data = np.sort(tmp) # find the finite values blop = np.isfinite(tmp).nonzero() nfin = blop[0].size return data[value*nfin]
def barycenter(im): ''' returns the center of mass of an array Input: - im, an array-like Outputs: - xc, yc, the coordinates of the center of mass ''' dim = im.shape if len(dim)< 1: print 'input array must have at least one dimension' return elif len(dim) == 1: xdim = np.arange(dim[0]) xc = np.sum(xdim*im)/np.sum(im) return xc elif len(dim) == 2: xdim = np.arange(dim[1]) ydim = np.arange(dim[0]) X, Y = np.meshgrid(xdim, ydim) xc = np.sum(X*im)/np.sum(im) yc = np.sum(Y*im)/np.sum(im) if xc < 0 or xc > dim[1]: xc = dim[1]/2 if yc < 0 or yc > dim[0]: yc = dim[0]/2 return xc, yc
[docs]def rebin(image, binfactor): ''' rebins an image to a new format. Condition: newformat is smaller than the original image format. Inputs: - image, an numpy array - binfactor, the binning factor Output: - binimage ''' dim = image.shape ny = dim[0] nx = 0 ndim = len(dim) if ndim>1: nx = dim[1] newy = ny / binfactor newx = nx / binfactor resy = ny % binfactor resx = nx % binfactor if (resy or resx) and ndim>1: image = image[resy:, resx:] elif resy and ndim == 1: image = image[resy:] if ndim>1: sh = newy,ny//newy, newx, nx//newx return image.reshape(sh).mean(-1).mean(1) elif ndim == 1: sh = newy, ny//newy return image.reshape(sh).mean(1)
[docs]def statbin(image, binfactor): ''' statbin returns the mean values and standard deviation of the values inside a bin. Inputs: - image, a numpy array, 1D - binfactor, the binning factor. Has to be smaller than the image size... Outputs: - the mean values of the bins - the std values of the bins ''' nx = image.size newx = nx // binfactor resx = nx % binfactor if newx == 0: return 0., 0. if resx ==0: binmage = image.copy() else: binmage = image[resx/2:-resx/2] sh = newx, binfactor mim = np.nanmean(binmage.reshape(sh), axis=1) sim = np.nanstd(binmage.reshape(sh), axis=1) return mim, sim
[docs]def find_outliers(*args, **kwargs): ''' find_outliers searches for outliers in Y as a function of X. Inputs: - X, array of points - Y, array of points correlated to X - bins, the number of bins to consider, default 5 - sig, a rejection criterion, default 4 Output: - a list of the index of the good pixels ''' if len(args) == 1: ysort = args[0] elif len(args) == 2: x, y = args xsort = np.argsort(x) ysort = y[xsort] else: print ' Only 2 arguments accepted' bins = kwargs.get('bins', 5) sig = kwargs.get('sig', 3.) dim = ysort.size step = dim/bins ## test to verify that : # 1. The bins size is smaller than the sample size: if bins > dim: print 'too many bins for the sample size' return elif step < 3: print 'not enough elements in each bins to do statistics' return res = dim % bins while res > step: bins += 1 res = dim - bins*step goodvalues = np.zeros(dim) for j in xrange(bins): if j == bins-1: tmp = ysort[j*step:] else: tmp = ysort[j*step:(1+j)*step] rstd = robuststd(tmp) ## Mark the good values below threshold if j == bins-1: goodvalues[j*step:] = (np.abs(tmp - np.median(tmp)) < sig * rstd) else: goodvalues[j*step:(j+1)*step] = (np.abs(tmp - np.median(tmp)) < sig * rstd) goodvalues = np.array(goodvalues, dtype=bool) return goodvalues
[docs]def robuststd(X): ''' Calculates a robust standard deviation for the distribution X, by removing gradually the outliers. Usefull when a few outliers are skewing the calculation of the standard deviation. Input: - X, array-like Output: - the robust standard deviation of X ''' ## find a "robust" standard deviation dX = X - np.median(X) istd = dX.std() nx = dX.size mm = [max(np.abs(dX)),] dstd = istd - dX[np.abs(dX) < min(mm)].std() gd = (np.abs(dX) < min(mm)).nonzero()[0] while (dstd >= .1) and (dX.size > nx*0.75): dX = X[gd] - np.median(X[gd]) mm.append(max(np.abs(dX))) gd_ = (np.abs(dX) < min(mm)).nonzero()[0] dstd = dX.std() - dX[gd_].std() gd = gd[gd_] robustd = X[gd].std() return robustd
[docs]def gaussian(x, xo, w, A): ''' Returns a gaussian function centered in xo, with a width w and an amplitude A. Inputs: - x, 1d array of the values used for computing the Gaussian - xo, the centre of the gaussian - w, the width of the gaussian - A, the amplitude Output: - G(x) = A exp(-(x-xo)^2/(2W^2)) ''' return A*np.exp(-(x-xo)**2/(2*w**2))
def Gfitting(P, x, y): '''Use for a gaussian fit''' xo, w, a = P['xo'].value, P['w'], P['a'] g = gaussian(x, xo, w, a) E = (y - g)**2 return E