py4sci

Source code for sites

"""
.. module:: sites
.. moduleauthor:: Anna-Lea Lesage   
.. created:: Nov. 2013
.. modified:: Nov. 2014
This module contains the site and camera dependent routines.
Use class Site to define an observing site:
     obs_site = sites.Site(latitude, longitude, name = 'Mysite')
Once the observation site is defined, you can:
    * transform back and forth from ra-dec to alt-az (Equatorial2Horizontal)
    * get the local sidereal time on site (LocalSiderealTime)
    * get the Sun's altitude on site (Sunaltitude)
    * get the apparent stars from a ra-dec list (ApparentStars)
    * the local refraction index (LocalRefraction)
    
Use class CamCoord to define a camera and its pointing direction:
    mycam = sites.CamCoord(altitude, azimuth, orientation, focal=focal)
With a defined camera you can:
    * refine the pointing using (findPointing)
    * or get the astrometric solution (makeAstrometry)
    * transform back and forth from alt,az to x,y
    * transform back and forth from alt,az to phi,theta, the angular coordinates.
    * update/ create a caminfo file saving all the parameters definng the camera
"""

__version__='15.10.01'

### ***  Latest Changes Record  *** ###
### 04.12.2014. Added line 1090 in makeAstrometry to limit the stellar sample to vmag<7 
###             when doing a full astrometric solution (including findPointing). 
###             Goal is to avoid a early exit of the program caused by a lack of detected stars.
### 04.12.2014  Major Bug : Changed indentation line 1070, to respect the else condition.
### 05.12.2014  Included a condition in the astrometry in case the Moon (or anything very 
###             bright) is in the field. If True, the "damaged" pixels are ignored/avoided
### 11.12.2014  Added an updatecaminfo() with condition if stuckloop is true
### 12.01.2015  Added in reload the backup keyword: if set, it reloads the last backup file
### 12.01.2015  Added in getcaminfo a backup keyword. If set, it read the backup file
### 12.01.2015  Optimised the refraction calculation to avoid instability close to horizon.
### 14.01.2015  Added function inflectionPoint which checks whether the polynomial 
###             solution presents an inflection point. The function is used as a condition
###             for finding the first valid polynomial solution.
### 19.01.2015  Added in evaluateAstrometry a removeOutliers routine instead of the findOutliers
###             Added in evaluateAstroemtry 3 increasing fwhm for the FIND routine
### 19.01.2015  Restricted the criterion to write to file and update the camera after 
###             running an astrometric solution
### 02.03.2015  MAJOR CHANGE in solving the astrometry!
###             Solving now the distortions as functions of X and Y without radial polynomial
###             Added functions distortX_tnx, distortY_tnx and solveDistortions for solving
###             the distortions
###             Added function solveAstrometry to be used instead of makeAstrometry
###             solveAstrometry calculates the master astrometric solution to which small 
###             corrections are applied
###             Added the functions solveCorrections and CorrectPositions to calculate and
###             apply first order corrections to the stellar positions
### 07.04.2015  Changed in Pointing the way the orientation of the central camera is
###             fixed. Use now the altitude as criterion instead of the camera name.
### 08.07.2015  Added the function moonposition to the Site class to obtain the moon position
###             and eventually phase at a given time.
### 01.10.2015  Added in CamCoord.logAstrometry an entry flag/rate to have indication on the 
###             detection rates when doing the astrometry
### 01.10.2015  Edited LocalSiderialTime to implement the possibilty to give an array as input
### ******************************* ###


import numpy as np
import bottleneck as bt
import math
#import matplotlib.pyplot as pl

class Site(object):
[docs] """ This class defines a location on Earth from which the sky is observable It is defined by latitude and longitude, and name of the bserving site. """ def __init__(self,*args, **kwargs):
[docs] ''' Generate a site by specifying the latitude and longitude in degrees, and optionally a name for the observatory Inputs: - either the name of a site file (.dat) - or latitude, longitude, elevation ''' nargs = len(args) if nargs==1: name = args[0] self._getsiteinfo(name) self.name = name else: lat, lng, elev = args self.latitude = lat #self.latitude.range = (-90,90) self.longitude = lng #self.longitude.range=(-180,180) self.elevation = elev #self.elevation.range = (0, 5000) name = kwargs.pop('name', 'Default Site') self.name = name sname = kwargs.pop('ID', 'DS') self.sname = sname pressure = kwargs.pop('pressure', None) self.pressure = pressure temperature = kwargs.pop('temperature', None) self.temperature = temperature humidity = kwargs.pop('humidity', None) self.humidity = humidity def _getLatitude(self):
return self._lat def _setLatitude(self, val): self._lat = val latitude = property(_getLatitude, _setLatitude, doc='geographic Latitude given in degree') def _getLongitude(self): return self._long def _setLongitude(self, val): self._long = val longitude = property(_getLongitude, _setLongitude, doc='Geographic Longitude given in degree') def _setTemperature(self, value): self.temperature = value def _setPressure(self, value): self.pressure = value def _getsiteinfo(self, sitename):
[docs] ''' Extract from the siteinfo file all the informations required to run the processing and the exposure directly. siteinfo file is a unique file per station containing: - latitude, longitude, altitude - getaway to the data from the weatherstation? ''' dtypes=[('name', str), ('latitude', float), ('longitude', float), ('altitude', float),('sname', str), ('timezone', str)] try: s = open('siteinfo.dat', 'rb') except IOError: print 'File does not exist...' return bla = s.read() siteinfo = bla.split('\n') lines = [l for l in siteinfo if not l.startswith('#') if not l.strip()==''] lists = [[] for n in dtypes] for l in lines: for i,e in enumerate(l.split(' ')): # attention there are 2 spaces in the file!! lists[i].append(dtypes[i][1](e)) sites = np.rec.fromarrays(lists,names=[e[0] for e in dtypes]) ts = (sites.name == sitename) ts = ts + (sites.sname == sitename) this_site = sites[ts] self.latitude = this_site.latitude[0] self.longitude = this_site.longitude[0] self.elevation = this_site.altitude[0] self.sname = this_site.sname[0] import pytz self.timezone = pytz.timezone(this_site.timezone[0]) return def _writeSiteInfo(self, filename, name = '', ID =''):
[docs] ''' Uses the information given to write a siteinfo.dat file. ''' if name != '': self.name = name if ID != '': self.sname = ID Header = '#Name Latitude Longitude Altitude ID \n' Line = '{0} {1} {2} {3} {4}'.format(self.name, self.latitude, self.longitude, \ self.elevation, self.sname) with open(filename, 'wb') as f: f.write(Header) f.write(Line) def _updateSiteInfo(self, filename='siteinfo.dat'):
[docs] ''' To update a site entry in an existing site info file. ''' try: with open(filename, 'rb') as s: bla = s.read() except IOError: print 'File does not exist... creating one...' self._writeSiteInfo('siteinfo.dat') return siteinfo = bla.split('\n') lines = [l for l in siteinfo if not l.startswith('#') if not l.strip()==''] lists = [] for l in lines: tmp = l.split(' ')[0] lists.append(tmp) ts = np.asarray(lists) == self.name newlines = np.asarray(lines) newlines[ts] = '{0} {1} {2} {3} {4}'.format(self.name, self.latitude, \ self.longitude, self.elevation, self.sname) with open(filename, 'wb') as s: for l in newlines: s.write(l+'\n') return def _appendSiteInfo(self, filename='siteinfo.dat'):
[docs] ''' Append a site information line to the filename. ''' Line = '{0} {1} {2} {3} {4}'.format(self.name, self.latitude, self.longitude, \ self.elevation, self.sname) try: with open(filename, 'ab') as s: s.write(Line + '\n') except IOError: print 'File does not exist... creating one...' self._writeSiteInfo('siteinfo.dat') return def LocalSiderialTime(self, *args, **kwargs):
[docs] ''' Compute the local siderial time at the given Site, based on a time input. Uses the Greenwich Mean/Apparent Sidereal Time for the calculations. Inputs: * the date, either a 'datetime' object, or a float64 in JD, OR a list containing [year, month, day, hour, min, sec] * the exposure time (required to calculate the lst index and lst sequence) Keywords: * apparent, to calculate the apparent sidereal time (LAST) taking into account the nutation of the Earth * outformat, to define the form of the output. Default is in fractional hours (e.g 11.20374563....). Accepted are : - 'seconds', the output is given in second - 'hhmmss', the output is a 3-tuple (hr, mn, sec) * stindex, if set, then the function returns TWO numbers, the LST at site, in the desired format, and the exposure index. * lpstseq, if set then return the sequence number starting 01.01.2013 at midnight Outputs: the local Mean/Apparent Sidereal time in the given output format. Example: To obtain the LAST in Leiden : >>> LE = Site(lat, lng) >>> lst = LE.LocalSiderialTime(datetime.utcnow(), apparent=True) Remark: This does not take into account Time Zones when giving a datetime object in input!!! Refer to UTC instead! ''' from datetime import datetime, timedelta from Masfunct import Greenwich_Mean_Sidereal_Time, calendar2jd, jd2calendar if len(args) ==1: ti = args[0] if np.isscalar(ti): ### Check datatype : ## 1. is it a datetime object? if type(ti) == datetime: jd = calendar2jd(ti) ## 2. is it a JD scalar? elif type(ti) == np.float64: jd = ti else: raise SyntaxError('The date format is not recognized') ## 3. or is it an array of dates? elif ti.dtype != np.float64: jd = calendar2jd(ti) else: jd = ti elif len(args)==2: ti, texp = args if np.isscalar(ti): if type(ti) == datetime: jd = calendar2jd(ti) elif type(ti) == np.float64: jd = ti else: print 'from here???' raise SyntaxError('The date format is not recognized') ## 3. or is it an array of dates? elif ti.dtype != np.float64: jd = calendar2jd(ti) else: jd = ti elif len(args) == 6: yr, mn, day, hr, mn, sec = args jd = calendar2jd([yr,mn,day,hr,mn,sec]) else: raise SyntaxError("What is this date format?") apparent = kwargs.pop('apparent', True) if not np.isscalar(jd): LST = np.array(([Greenwich_Mean_Sidereal_Time(j, apparent) for j in jd] \ + (self._long)/15)%24) else: LST = (Greenwich_Mean_Sidereal_Time(jd,apparent) + (self._long)/15 ) % 24 stindex = kwargs.pop('stindex', False) if stindex: if 'texp' not in locals(): texp = 6.4 stidx = LST*3600/texp gblseq = kwargs.pop('lpstseq', False) if gblseq: from Masconst import iniday, lstday if isinstance(ti, datetime): then = ti else: then = jd2calendar(jd) if 'texp' not in locals(): texp = 6.4 oneday = timedelta(hours=24) onelstday = timedelta(hours=lstday) indexperday = int(oneday.total_seconds()/np.around(texp, decimals=1)) getdif = np.vectorize(lambda x:(x - iniday).total_seconds()) daylst = getdif(then)/onelstday.total_seconds() // 1 if texp < 1.: lpstseq = -1 else: lpstseq = np.array(daylst*indexperday + \ LST*3600/np.around(texp, decimals=1)//1, dtype=long) outformat = kwargs.pop('outformat', None) if outformat is 'seconds': LST = LST*3600 elif outformat is 'hhmmss': hh = LST // 1 mm = ((LST%1)*60)//1 ss = (((LST%1)*60)%1)*60 LST = [hh,mm,ss] if stindex and not gblseq: return LST, stidx elif gblseq and not stindex: return LST, lpstseq elif stindex and gblseq: return LST, stidx, lpstseq else: return LST def Equatorial2Horizontal(self, *args, **kwargs): # TESTED WOrking
[docs] ''' Conversion of Equatorial Coordinates to Horizontal Coordinates for a given local sidereal time at the defined observation site. Inputs: - ra, a vector or scalar of Right Ascensions - dec, a vector or scalar of Declinations - date, a datetime object or JD Outputs: - Alt, Az vectors of same dimensions as the inputs. Example: Converting ra = [1.2391, 20.4982, 35.9821] dec = [-30.4135, 35.0982, -31.0918] to alt az: >>> LE = Site(lat, lng) >>> alt, az = LE.Equatorial2Horizontal(ra, dec, date) The function shouldn't be used alone since it doesn't make any correction for precession, nutation, aberration nor refraction. ''' from datetime import datetime from Masfunct import rect2pol, calendar2jd, epoch2jd from Masconst import deg2rad if len(args) == 3: ra, dec, date = args if ra.dtype!=np.float64: ra.dtype = np.float64 dec.dtype = np.float64 else: raise SyntaxError("Input is RightAscensio, Declination and Date") if type(date)==datetime: jd = calendar2jd(date) elif date < 3000. : # Case where the input is in fractional years. jd = epoch2jd(date) else: jd = date # Convert the ra and dec into array if they aren' already: ra = np.array(ra) dec = np.array(dec) degree = kwargs.pop('degree', True) if degree: # Calculation of the hour angle of the stars: tau = (self.LocalSiderialTime(jd, apparent=True)*15 - ra)%360 ch = np.cos(tau * deg2rad) sh = np.sin(tau * deg2rad) cd = np.cos(dec *deg2rad) sd = np.sin(dec *deg2rad) cl = np.cos(self._lat * deg2rad) sl = np.sin(self._lat * deg2rad) else: # Calculation of the hour angle of the stars: tau = (self.LocalSiderialTime(jd, apparent=True)*(np.pi/12) - ra)%np.pi ch = np.cos(tau) sh = np.sin(tau) cd = np.cos(dec) sd = np.sin(dec) cl = np.cos(self._lat) sl = np.sin(self._lat) Z = np.array([-ch*cd*sl + sd*cl, -sh*cd, ch*cd*cl+sd*sl]) az, alt = rect2pol(Z, Eq2AltAz=True) return alt, az%360 def Horizontal2Equatorial(self, *args, **kwargs):
[docs] ''' Converts the horizontal coordinates which are site dependent to equatorial coordinates. Inputs: - alt, a scalar or vector of altitudes - az, a scalar or vector of azimuth, of same size as alt. - date, the date to perform the conversion. It can be a datetime object, or a float64(JD) Keywords: - refraction, if True computes the refraction corrected coordinates. - degree, (default = True) set if the input coordinates are in degree. Outputs: - Ra, a scalar or vector of Right ascensions - Dec, a scalar or vector of declinations ''' from datetime import datetime from Masfunct import calendar2jd, rect2pol from Masconst import deg2rad # What is the date format? if len(args)==3: alt, az, date = args else: raise SyntaxError("Inputs are Alt, Az, Date") if type(date)== datetime: jd = calendar2jd(date) else: jd = date # Start conversion # Taking into account atmospheric refraction: refraction = kwargs.pop('refraction', False) degree = kwargs.pop('degree', True) if refraction: R = self.localrefraction(alt, observed=True) alt = alt - R # Calculate LAST last = self.LocalSiderialTime(jd, apparent = True) if degree: cz = np.cos(az*deg2rad) sz = np.sin(az*deg2rad) ca = np.cos(alt*deg2rad) sa = np.sin(alt*deg2rad) else: cz = np.cos(az) sz = np.sin(az) ca = np.cos(alt) sa = np.sin(alt) cl = np.cos(self._lat*deg2rad) sl = np.sin(self._lat*deg2rad) Z = np.matrix([-cz*sl*ca + sa*cl, -sz*ca, sl*sa+cz*ca*cl]) ha, dec = rect2pol(Z) ra = (last*15 - ha)%360 return ra, dec def ApparentStars(self, *args, **kwargs):
[docs] ''' Calculate from a list of (ra, dec) coordinates which stars are visible from Site at the given time Inputs : - ra, a scalar or a vector of right ascension coordinates - dec, a scalar or a vector of declinations - date, the time chose for the calculation. Can be datetime object or JD Keywords: - precession= True, account for precession in the coordinate conversion - nutation=True, account for nutation in the coordinate conversion - aberration=True, account for annual aberrations - refraction=True, to computer the apparent altitude of the stars Outputs: - alt, az the horizontal coordinates of the apparent stars Attention!!! the dimension of alt, az may be smaller than the inputs dimensions. If the star is not visible, no output! ''' from datetime import datetime from Masfunct import calendar2jd, rect2pol, pol2rect, epoch2jd from Coordinates import Precession_matrix, Nutation_matrix, Aberration if len(args) == 3: ra, dec, date = args else: raise SyntaxError("Input is RightAscension, Declination and Date") if type(date)==datetime: jd = calendar2jd(date) elif date < 3000. : # Case where the input is in fractional years. jd = epoch2jd(date) else: jd = date precession = kwargs.pop('precession', True) nutation = kwargs.pop('nutation', True) aberration = kwargs.pop('aberration', True) if precession is True and nutation is True: P = Precession_matrix(jd) N = Nutation_matrix(jd) X = pol2rect(ra, dec) Y = N*P*X nra, ndec = rect2pol(Y) elif precession is True and nutation is False: P = Precession_matrix(jd) X = pol2rect(ra, dec) Y = P*X nra, ndec = rect2pol(Y) else: nra, ndec = ra, dec if aberration: # With the new convention, are aberration needed?? dra, ddec = Aberration(ra, dec, jd) nra = nra+dra ndec = ndec+ddec alt, az = self.Equatorial2Horizontal(nra, ndec, jd) refraction = kwargs.pop('refraction', True) if refraction: dalt = self.localrefraction(alt) alt = alt + dalt visible = kwargs.pop('visible', 10) vis = (alt > visible).nonzero() alt = alt[vis] az = az[vis] return alt, az, vis def Sunaltitude(self, *args, **kwargs):
[docs] ''' Sunaltitude calculates the altitude of the Sun at the given site for a given time/date. Inputs: - the chosen date. Supported formats are: *datetime:object*, *JD:float64* or a list/tupler [yy,mth, day, hr,mn,sec] Keywords: - apparent, set this to take into account the nutation of the Earth's rotation. Output: the altitude of the Sun. ''' from Coordinates import SunCoord from datetime import datetime from Masfunct import calendar2jd # Check the format of the date input: if len(args)==6: # yy,mth,day,hr,min,sec input yr, mth, day, hr, mn, sec = args jd = calendar2jd([yr,mth,day,hr,mn,sec]) elif len(args)==1: date = args[0] if type(date)==datetime: jd = calendar2jd(date) else: jd = date else: raise SyntaxError("Date format is yr,mth,day,mn,sec or datetime object or JD") # Get the Coordinates of the Sun Sra, Sdec = SunCoord(jd) # Convert those to alt-az: salt, saz = self.Equatorial2Horizontal(Sra, Sdec, jd) # And take into account atmospheric refraction? refraction = kwargs.pop('refraction', True) R = 0 if refraction: R = self.localrefraction(salt) return salt+R, saz def localrefraction(self, *args, **kwargs):
''' localrefraction calculates the small deviations dalt between the observed altitude ofthe stars and their real altitude (outside the Earth's atmosphere). dalt is calculated using the formula from Meeus, and is valid for any positive altitude. There is an error of 0.07 arcminute for altitudes between 12 and 0 degrees. Inputs: - alt, the true altitude Outputs: - dalt, the difference to the observed altitude. ''' from Masconst import deg2rad, To, Po alt = args[0] # Set temperature and Pressure of the site... # Case 1. No pressure or temperature are available for the Site. # Calculate then the approximated pressure at the site altitude... if self.temperature is None: self.temperature = 15. if self.pressure is None: from Masconst import go, R, M self.pressure = Po * np.exp(-go*M*self.elevation/(R*(To+self.temperature))) if self.humidity is None: self.humidity = 0.5 # 50% # From Meeus, Astronomical Algorithms, chapter 15 # Do we want the correction from the observed position, or from the absolute # position? precise = kwargs.pop('precise', True) observed = kwargs.pop('observed', False) if observed: R = self.__refract_forward(alt) else: if precise: R = self.__refract_forward(alt) current = alt+R last = current + 5 while (np.abs(last-current).mean()*3600 > 2.): last = current R = self.__refract_forward(current) current = alt+R else: R = 1.02/np.tan((alt + 10.3/(alt+5.11))*deg2rad)/60.0 R = R *self.pressure/Po*To/(self.temperature+To) return R def __refract_forward(self, alt): from Masconst import deg2rad w = alt < 15. R = 0.0166667/np.tan((alt + 7.31/(alt+4.4))*deg2rad) R[w] = 3.569*(0.1594+0.0196*alt[w]+0.00002*alt[w]**2)/(1.+0.505*alt[w] + \ 0.0845*alt[w]**2) corr = self.pressure/1010*283./self.temperature R = R*corr return R / 3600. class CamCoord(object):
[docs] ''' The CamCoord class defines the orientation of the camera on sky. It is defined by the location of the center of the lens on the camera, Xo and Yo, the pointing on sky, Alto and Azo, and the rotation of the CCD, Tho. ''' def __init__(self, *args, **kwargs):
[docs] '''Generates a camera by specifying the charateristics of the lens, and the pointing, orientation of the CCD. In addition a name is attributed to the camera. * Alto, Azo are the initial pointing of the camera on sky * Tho is the rotation angle of the camera * Xo, Yo the position of the center of the lens --may not coincide with the center of the CCD. ''' nargs = len(args) if nargs == 0: ### No pointing provided focal = kwargs.get('focal', 24) pixsize = kwargs.get('pixsize', 9)*1e-3 self.focal = focal self.pixsize = pixsize self.nx = kwargs.get('nx', 2048) self.ny = kwargs.get('ny', 2048) self.Xo = self.nx/2 self.Yo = self.ny/2 self.Alto = kwargs.get('altitude', 45) print 'Pointing altitude is {}degree above horizon'.format(self.Alto) self.Azo = kwargs.get('azimuth', 0.) print 'Pointing azimuth is {}degree from North'.format(self.Azo) self.Tho = kwargs.get('orientation', 0.) print 'Camera orientation is {}degree from North'.format(self.Tho) name = kwargs.get('name', 'TestCamera') self.name = name self.filename = self.name+'.dat' self.sname = name[0] self.delta = 10. CD = np.zeros((2,2)) CD[0,0] = CD[1,1] = 1. CRPIX = np.zeros(2) self.tnx = {'cd':CD, 'coefx':0, 'coefy':0, \ 'xterm':0, 'crpix':CRPIX, 'xfit':0, 'yfit':0} if nargs == 1: filename = kwargs.get('filename', '') self.getcaminfo(args[0], filename=filename) self.tnx['cd'].shape = (2,2) self.rfov = np.arctan(math.sqrt(self.nx**2 + self.ny**2)/2 * self.pixsize/self.focal) self.xfov = np.arctan(self.nx/2 * self.pixsize/self.focal) self.yfov = np.arctan(self.ny/2 * self.pixsize/self.focal) self.Pco = 1./np.tan(self.rfov) self.backup = False self.rate = 0 self.tnx_back = {} xCor = np.array([0,1,0], dtype=float) yCor = np.array([0,0,1], dtype=float) self.Correction = {'orderCor':1, 'xCor':xCor, 'yCor':yCor} def __adjust_altitude(self):
''' This function is designed in particular for the case where the camera may point toward zenith. Finding the pointing in that case is difficult. The function allows for small variations around the 90degree value by staying around that position. ''' if self.Alto < 90: pass # Then the function is not needed elif self.Alto >=90: self.Alto = 90. - self.Alto%90 return def Hor2PhiThe(self, *args, **kwargs): ''' Convert the horizontal coordinates (Alt-Az) to the polar like Phi-Theta coordinates of the detector. Phi is a radius-like angle ranging from [0, rfov]. Theta is the angle defined from Zenith to the star, and ranging from [0, 360]. Inputs: - Alt, the altitude of the target, a scalar or a vector - Az, the azimuth of the target, a scalar or a vector Outputs: Phi Theta ''' from Masconst import deg2rad from Masfunct import pol2rect, rect2pol, rotmat if len(args) != 2: raise SyntaxError('Inputs are Alt and Az!!') else: alt, az = args degree = kwargs.pop('degree', True) if degree: rAlt = alt newaz = (az - self.Azo) % 360 PX = pol2rect(newaz, rAlt, degree=True) else: rAlt = alt newaz = (az - self.Azo*deg2rad) % (np.pi*2) PX = pol2rect(newaz, rAlt, degree=False) R = rotmat(90.-self.Alto, 'y', degree=True) # Proceed to matrix rotation NX = R*PX # Return to polar coordinates theta, phi = rect2pol(NX) phi, theta = (90 - phi)%360, (theta - self.Tho)%360 goodpoints = (phi*deg2rad <= self.rfov*1.0) return phi, theta, goodpoints def PhiThe2XYR(self, *args, **kwargs): ''' Convertion of the polar Phi-theta coordinates to the rectangular coordinates describing the CCD (X-Y-R). Inputs: - phi, a vector or a scalar, representing the angular distance between the pointing vector and the stars. - theta, a vector or a scalar, representing the angle from Zenith. - Pc, an array-like containing polynomial coefficients Keyword: - degree is True is the inputs are in degree Outputs: - X,Y,R have the same dimensions as phi-theta Remark: the X output is mirrored to match the indexation of the image matrix. ''' from Masconst import deg2rad phi, theta = args degree = kwargs.pop('degree', True) # Are the input angles in degree or in rad? if degree: tp = np.tan(phi*deg2rad) rn = self.Pco * tp r = rn * math.sqrt(self.nx**2 + self.ny**2)/2 x = - np.cos(theta*deg2rad)*r + self.Xo ### Here mirror the x!! y = np.sin(theta*deg2rad)*r + self.Yo return x,y,r else: tp = np.tan(phi) rn = self.Pco * tp r = rn *math.sqrt(self.nx**2 + self.ny**2)/2 x = - np.cos(theta)*r + self.Xo ### Here mirror the x!! y = np.sin(theta)*r + self.Yo return x,y,r def PhiThe2Hor(self, *args): ''' Convert the camera dependent polar coordinates Phi-Theta into Horizontal coordinates Az-Alt. Inputs: - phi, a vector or a scalar - theta, a vector or a scalar of same dimension as phi Outputs: - the corresponding horizontal coordinates. ''' from Masfunct import pol2rect, rotmat, rect2pol if len(args) == 2: Phi, The = args else: raise SyntaxError("Inputs are Phi, theta") theta = (The +self.Tho)%360 phi = 90. - Phi X = pol2rect(theta, phi, degree=True) R = rotmat(self.Alto-90, 'y', degree=True) az, alt = rect2pol(R*X) az = (az + self.Azo)%360 return alt%360,az def XYR2PhiThe(self, *args,**kwargs): ''' Convert the pixel coordinates (X-Y) into polar detector coordinates (Phi-Theta). Inputs: - X, Y, two vectors of same size or two scalars, representing the location of an object on the CCD. X and Y usually scale within the number of pixel of the CCD. Outputs: Phi, The ''' from Masconst import rad2deg X, Y = args r = np.sqrt((self.Xo-X)**2 + (Y-self.Yo)**2) rn = 2*r / math.sqrt(self.nx**2 + self.ny**2) the = (np.arctan2(Y - self.Yo, self.Xo - X)*rad2deg) %360 phi_ = rn * 1./ self.Pco phi = np.arctan(phi_)*rad2deg return phi%360, the, rn def distortX_tnx(self, *args, **kwargs):
[docs] ''' tnx function for calculating the lens distortions ''' from numpy.polynomial.polynomial import polyval from itertools import product if len(args) == 3: params, x, y = args elif len(args) == 2: x, y = args params = self.tnx['coefx'] forward = kwargs.get('forward', True) solve = kwargs.get('solve', False) ### when solving the distortions: return the uncorrected solution if solve: return x ### Forward is for calculating the star position on the CCD: from ra-dec to x-y if forward: xin = self.tnx['cd'][0,0]*(x-self.tnx['crpix'][0]) + self.tnx['cd'][0,1]*(y-self.tnx['crpix'][1]) yin = self.tnx['cd'][1,0]*(x-self.tnx['crpix'][0]) + self.tnx['cd'][1,1]*(y-self.tnx['crpix'][1]) xp = 0.0 xterm = self.tnx['xterm'] if xterm == 1: # Full cross-term solution params.shape = (self.tnx['xorder'], self.tnx['yorder']) for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): xp = xp + xin**i * yin**j * params[i,j] elif xterm == 0: # No cross-terms xp = polyval(xin, params) elif xterm == 2: # Half cross-term solution maxxt = max(self.tnx['xorder'], self.tnx['yorder'])-1 count = 0 for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): if (i+j) <= maxxt: xp = xp + xin**i * yin**j * params[count] count +=1 ### else we calculate the solution backwards: to get the ra-dec from the x-y else: xin = self.tnx_back['cd'][0,0]*(x-self.tnx_back['crpix'][0]) + \ self.tnx_back['cd'][0,1]*(y-self.tnx_back['crpix'][1]) yin = self.tnx_back['cd'][1,0]*(x-self.tnx_back['crpix'][0]) + \ self.tnx_back['cd'][1,1]*(y-self.tnx_back['crpix'][1]) xp = 0.0 xterm = self.tnx['xterm'] if xterm == 1: # Full cross-term solution params.shape = (self.tnx['xorder'], self.tnx['yorder']) for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): xp = xp + xin**i * yin**j * params[i,j] elif xterm == 0: # No cross-terms xp = polyval(xin, params) elif xterm == 2: # Half cross-term solution maxxt = max(self.tnx['xorder'], self.tnx['yorder'])-1 count = 0 for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): if (i+j) <= maxxt: xp = xp + xin**i * yin**j * params[count] count +=1 xp = -xp ximodel = xin+xp return ximodel def distortY_tnx(self, *args, **kwargs):
[docs] ''' tnx function for calculating the lens distortions ''' from numpy.polynomial.polynomial import polyval from itertools import product if len(args) == 3: params, x, y = args elif len(args) == 2: x, y = args params = self.tnx['coefy'] forward = kwargs.get('forward', True) solve = kwargs.get('solve', False) ### when solving the distortions: return the uncorrected solution if solve: return y ### Forward is for calculating the star position on the CCD: from ra-dec to x-y if forward: xin = self.tnx['cd'][0,0]*(x-self.tnx['crpix'][0]) + \ self.tnx['cd'][0,1]*(y-self.tnx['crpix'][1]) yin = self.tnx['cd'][1,0]*(x-self.tnx['crpix'][0]) + \ self.tnx['cd'][1,1]*(y-self.tnx['crpix'][1]) yp = 0.0 xterm = self.tnx['xterm'] if xterm == 1: # Full cross-term solution params.shape = (self.tnx['xorder'], self.tnx['yorder']) for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): yp = yp + xin**i * yin**j * params[i,j] elif xterm == 0: # No cross-terms yp = polyval(yin, params) elif xterm == 2: # Half cross-term solution maxxt = max(self.tnx['xorder'], self.tnx['yorder'])-1 count = 0 for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): if (i+j) <= maxxt: yp = yp + xin**i * yin**j * params[count] count +=1 ### else we calculate the solution backwards: to get the ra-dec from the x-y else: xin = self.tnx_back['cd'][0,0]*(x-self.tnx_back['crpix'][0]) + \ self.tnx_back['cd'][0,1]*(y-self.tnx_back['crpix'][1]) yin = self.tnx_back['cd'][1,0]*(x-self.tnx_back['crpix'][0]) + \ self.tnx_back['cd'][1,1]*(y-self.tnx_back['crpix'][1]) yp = 0.0 xterm = self.tnx['xterm'] if xterm == 1: # Full cross-term solution params.shape = (self.tnx['xorder'], self.tnx['yorder']) for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): yp = yp + xin**i * yin**j * params[i,j] elif xterm == 0: # No cross-terms yp = polyval(yin, params) elif xterm == 2: # Half cross-term solution #params.shape = (tnx['xorder'], tnx['yorder']) maxxt = max(self.tnx['xorder'], self.tnx['yorder'])-1 count = 0 for i,j in product(range(self.tnx['xorder']), range(self.tnx['yorder'])): if (i+j) <= maxxt: yp = yp + xin**i * yin**j * params[count] count +=1 #yp = yp + polyval(yp, self.tnx['yfit']) yp = - yp yimodel = yin + yp return yimodel def smallDistortX(self, x, solve=False, forward=True, noDistort=False):
''' smallDistortX adds a low order distortion in X direction which hasn't been resolved previously by solveDistort. Input: - x, an array of X CCD positions which need to be slightly moved Keywords: - solve, if True, returns without applying the small correction - forward, if True, to go from alt-az to x, y. If False, to go from xobs, yobs to alt,az Output: - x corrected ''' from numpy.polynomial.polynomial import polyval if solve or noDistort: return x dx = polyval(x, self.tnx['xfit']) if forward: return x - dx else: return x + dx def smallDistortY(self, y, solve=False, forward=True, noDistort=False): ''' same as for X''' from numpy.polynomial.polynomial import polyval if solve or noDistort: return y dy = polyval(y, self.tnx['yfit']) if forward: return y - dy else: return y + dy def VisibleStars(self, *args, **kwargs):
[docs] ''' Convert from alt-az to camera specficic cartesians coordinates. The conversion take into account the characteristics of the camera lens, which defines the camera. In addition a N order polynomial is provided to correct the aberration of the objective. Inputs: - alt, az, two vectors of equal size Outputs: - x, y, two vectors of equl size giving the pixel coordinates of the stars. - inCCD, the indices of the stars inside the camera. To be used later in the reduction process for the ID and the Vmag... Keywords: - margin, set this keyword to define a margin around the CCD. This is especially usefull when the pointing of the camera is not precise. - degree, to do the calculation in degree or if the inputs are in degrees global, set this keyword to the value defining by how much the predicted stellar position can be outside the CCD. - order, the order of the polynomial for fitting the lens - local, default is True. If set, only X and Y coordinates inside the CCD are returned. If False, then use inCCD to obtain the coordinates only in the CCD Example: To find the stellar position in the CCD: >>> x, y, inCCD = Cam.VisibleStars(alt, az, degree=True) To find stars in the CCD with a margin of 500pixels: >>> x, y, glCCD = Cam.VisibleStars(alt, az, degree=True, margin=500) ''' alt, az = args degree = kwargs.pop('degree', True) solve = kwargs.get('solve', False) local = kwargs.get('local', True) noDistort = kwargs.get('noDistort', False) Phi,The, GP = self.Hor2PhiThe(alt, az, degree=degree) xp, yp,r = self.PhiThe2XYR(Phi, The) x = self.distortX_tnx(xp, yp, solve=solve) y = self.distortY_tnx(xp, yp, solve=solve) x = self.smallDistortX(x, solve=solve, noDistort=noDistort) y = self.smallDistortY(y, solve=solve, noDistort=noDistort) margin = kwargs.pop('margin', 0) inCCD = (x > 0-margin)*(x < self.nx+margin)*(y > 0-margin)*(y < self.ny+margin) inCCD = inCCD * GP if local: x, y = x[inCCD], y[inCCD] return x,y, inCCD.nonzero()[0] def ReverseTnx(self):
[docs] ''' Calculates the backward coefficients of the distortions for going from pixel coordinates to horizontal coordinates ''' CDB = np.linalg.inv(self.tnx['cd']) self.tnx_back['cd'] = CDB self.tnx_back['crpix'] = -self.tnx['crpix'] self.tnx_back['coefx'] = - self.tnx['coefx'] self.tnx_back['coefy'] = - self.tnx['coefy'] return def ProjectionOnSky(self, *args):
[docs] ''' ProjectionOnSky transforms the X-Y coordinates of the stars found on the CCD into sky alt-az coordinates. The conversion tankes into accound the lens properties, via an N order polynomial to correct for most distortions. Inputs: - x, y, two vectors of equal size representing the coordinates of the stars on the CCD Outputs: - alt, az, two vectors of equal size giving the horizontal stellar coordinates. ''' xobs, yobs = args self.ReverseTnx() xi = self.smallDistortX(xobs, forward=False) yi = self.smallDistortY(yobs, forward=False) xp = self.distortX_tnx(xi, yi, forward=False) yp = self.distortY_tnx(xi, yi, forward=False) phobs, thobs, rnobs = self.XYR2PhiThe(xp, yp) alt, az = self.PhiThe2Hor(phobs, thobs) alt[alt > 90] = 90 - alt[alt>90]%90 return alt, az def solveAstrometry(self, *args, **kwargs):
[docs] ''' solveAstrometry calculates the astrometric solution for the observing set-up. First it verifies the current pointing/lens solution by searching for an observed star in the neighborhood of a predicted position. If the find rate is very high (define very high with prerate), the pointing/lens parameters are correct, and the program exits immediately. Inputs: - im, the image - alt, az, mag, three vectors of equal length for the horizontal coordinates and magnitude of the catalog stars. Keywords: - prerate, a scalar defining the rate at which the matching is good. - dtol, mtol, 2 scalars used in matchvortex, to account for scaling and magnitude variations. - display. If True, plots showing the progress are displayed. - log_to_queue, if True, a QueueHandler is created to pass log records to the main module via a Queue. Valid for multiprocessing. If False, the inputs are directly printed on screen Returns: - the Cam instance is changed on-the-fly. ''' import Masfunct from lmfit import Parameters, minimize from numpy.polynomial.polynomial import polyfit if len(args) ==5: log_to_queue = kwargs.get('log_to_queue', True) elif len(args) == 4: log_to_queue = kwargs.get('log_to_queue', False) debug = kwargs.get('debug', False) quiet = kwargs.get('quiet', False) #mtol = kwargs.pop('mtol', 0.5) #dtol = kwargs.pop('dtol', 0.25) prerate = kwargs.get('prerate', 0.15) xorder = kwargs.get('xorder', 6) yorder = kwargs.get('yorder',6) xterm = kwargs.get('xterm', 2) niter = kwargs.get('niter', 3) if log_to_queue: import logging import multiprocessing im, alt, az, mag, qlog = args logger = logging.getLogger(multiprocessing.current_process().name) logger.setLevel(logging.INFO) logger.debug('Checking the match rates' ) else: im, alt, az, mag = args ############################3 ## Verify the inputs: if type(im) == np.ndarray: if not im.ndim == 2: print 'Error! The inputs are: image, alt, az, magnitude, (logger)' return else: print 'Error! The inputs are: image, alt, az, magnitude, (logger)' return ############################# ## Check the brightness of the image: if bt.nanmedian(im) > 20000: ## If the image is too bright to find stars on it if log_to_queue: logger.warning('Image is too bright for astrometry') if not quiet: print 'WARNING -- Image is assumed too bright for performing the astrometry,' + \ 'median(im) = {}' .format(bt.nanmedian(im)) return ## test if the Moon or any other bright source is in the image elif (Masfunct.percentile(im, 0.95)>9500)+(bt.nanstd(im) > 2000): if log_to_queue: logger.warning('Bright elements in the image, clearing image') if not quiet: print 'WARNING --There are bright elements in the image. This may not be the '+\ 'optimal image to perform the astrometry ...' # First determine the observed position of the stars on the CCD: xpre, ypre, inCCD = self.VisibleStars(alt, az, margin=-50) ## Find stars close to the predicted positions xobs, yobs, matchit, dist_ = self.evaluateAstrometry(im, xpre, ypre, rad=20) ## Get root mean square distance dist0 = np.sqrt(bt.nansum(dist_**2)/dist_.size) ## Detection rate: how many stars were found close to the given coordinates drate = float(xobs.shape[0])/xpre.shape[0] if log_to_queue: logger.info('RMS distance : %g, detection rate %g' %(dist0, drate)) if not quiet: print 'RMS distance : %g, detection rate: %g' %(dist0, drate) if dist0 < prerate: # This is the case when the Pointing and the lens parameters are really well # estimated. Then nothing has to be done. if log_to_queue: logger.info('Astrometry -- exiting dist < prerate') if not quiet: print ' Astrometry -- exiting dist < prerate' self.delta = dist0 if dist0>0. else prerate return elif (dist0 <= prerate*4) or (drate > 0.1): # In this case, the pointing is quite correct, but probably the lens parameters # can be optimized. if log_to_queue: logger.info('Astrometry -- quick repoint ') if not quiet: print ' Astrometry -- Quick repoint' alto, azo, tho = self.Alto, self.Azo, self.Tho params = Parameters() params.add('azo', value=self.Azo, min = azo-10, max=azo+10) params.add('tho', value=self.Tho, min= tho-10, max=tho+10, vary=True) params.add('alto', value=self.Alto, min=alto-10, max=alto+10) params.add('xo', value=self.Xo, min=self.nx/2-150, \ max=self.nx/2+150, vary=False) params.add('yo', value=self.Yo, min=self.ny/2-100, \ max=self.ny/2+100, vary=False) tmpalt, tmpaz = alt[inCCD], az[inCCD] pialt, piaz = tmpalt[matchit], tmpaz[matchit] pixobs, piyobs = xobs, yobs if not quiet: print " Initial parameters: ", self.Alto, self.Azo, self.Tho, self.Xo, self.Yo minimize(self.Pointing, params, args=(pixobs, piyobs, pialt, piaz)) if not quiet: print " New parameters: ", self.Alto, self.Azo, self.Tho, self.Xo, self.Yo ### Changed by ANL 3.12.2014 xpre, ypre, inCCD = self.VisibleStars(alt, az, margin=-25, solve=True) xf, yf, matchit, dist_ = self.evaluateAstrometry(im, xpre, ypre, rad=25) if 'xorder' not in self.tnx.keys(): self.tnx['xorder'] = xorder if 'yorder' not in self.tnx.keys(): self.tnx['yorder'] = yorder self.solveDistortions(xf, yf, xpre[matchit], ypre[matchit], xorder=xorder, \ yorder=yorder, xterm = xterm, niter = niter) ## Verify the solution: xpre, ypre, inCCD = self.VisibleStars(alt, az, margin=-10, noDistort=True) xf, yf, mf, dist_ = self.evaluateAstrometry(im, xpre, ypre, rad=10) rmsdist_ = np.sqrt(bt.nansum(dist_**2)/dist_.size) if not quiet: try: print ' TNX distortions: \n RMS distance: {}, detection rate: {}'.format( \ rmsdist_, dist_.size/float(xpre.size)) except IOError: if log_to_queue: logger.error('IOError', exc_info=True) logger.info(' TNX distortions: \n RMS distance: {}, detection rate: {}'.format( \ rmsdist_, dist_.size/float(xpre.size))) tmpalt = alt[inCCD] tmpaz = az[inCCD] xalt, xaz = tmpalt[mf], tmpaz[mf] ## if solution degrades too much if rmsdist_ > self.delta+0.2: self.getcaminfo(self.filename) self.tnx['cd'].shape = (2,2) xpre, ypre, inCCD = self.VisibleStars(xalt, xaz, margin=-15, noDistort=True) ## Correct for small order distortions which couldn't be resolved before: self.tnx['xfit'] = polyfit(xf, xpre[mf]-xf, 3) self.tnx['yfit'] = polyfit(yf, ypre[mf]-yf, 3) ## Re-verify solution xpre, ypre, inCCD = self.VisibleStars(xalt, xaz, margin=20) dx = xpre-xf dy = ypre-yf dist = np.sqrt(dx**2+dy**2) rmsdist = np.sqrt(bt.nansum(dist**2)/dist.size) if debug: with open('astrolog.log', 'a+') as f: line = '{0:1.5f}, {1:1.5f}, {2:1.5f}, {3:1.5f}, {4:1.5f}, {5:1.5f}'.format( \ dx.std(), np.sqrt(np.sum(dx**2)/dx.size), dy.std(), np.sqrt(np.sum(dy**2)/dy.size), \ dist.std(), rmsdist) f.write(line+'\n') if log_to_queue: logger.info('Corrected mean distance %g' %rmsdist) logger.info('Running the master astrometric solution OK') if not quiet: print " Corrected distance: ", rmsdist if rmsdist < self.delta: self.delta = rmsdist self.writeCameraFile(overwrite=True) self.delta = rmsdist if rmsdist>0. else prerate return def find_pointing(self, im, alt, az, vmag, force=False, dtol=2., mtol=0.12, Niter=200, blind=False):
''' Find the pointing of the camera Inputs: - im, the night image to use as reference for fonding the pointing - alt, az, the horizontal coordinates of the catalog stars calculated at the time the reference image was taken - vmag, the corresponding Vmagnitude of the visible stars. Keywords: - force, to force the routine to run on the image, even if the image doesn't satisfy the optimal conditionss - dtol, the tolerance on the distance. Default is 5 pixels - mtol, the tolerance on the magnitude ratio. Default is 0.1 - Niter, the maximum number of iterations to find a pointing. - blind, to search blindly for the pointing. ''' ### 1. Verify the quality of the image for finding the pointing: nbadpix = (im >= 64500).nonzero()[0] nonoptimal = False if np.nanstd(im) > 750 and nbadpix > 500: nonoptimal = True ### 2. Unless the force keyword is put, we returns now: if nonoptimal: print ' The optimal conditions for finding the pointing are not met on this image. \n' if not force: return ### 3. Find the stars on the image, optimize detection... from numpy.polynomial.polynomial import polyfit, polyval from scipy.ndimage import filters import Masphot from Masconst import rad2deg ## a. get roughly a profile of the background ny, nx = im.shape x = np.arange(nx) y = np.arange(ny) xfit = polyfit(x, np.median(im[ny/2-50:ny/2+50, :], axis=0), 3) yfit = polyfit(y, np.median(im[:, nx/2-50:nx/2+50], axis=1), 3) xprof = polyval(x, xfit) yprof = polyval(y, yfit) a, b = np.meshgrid(xprof, yprof) profile = (a*b)/np.max(a*b) tim = im / profile ## b. Convolve the image with a gaussian kernel (blurring it) gim = filters.gaussian_filter(tim, 3., order=0) ## c. Find the stars: xobs, yobs = np.array([]), np.array([]) for fwhm in [3.5, 4.5]: xo, yo, s, r = Masphot.find(gim, gim.std()*0.5, fwhm, nsigma=1.5) xobs = np.hstack((xobs, xo)) yobs = np.hstack((yobs, yo)) ### Remove possible duplicates for the smae star: tmpxobs = np.around(xobs, decimals=1) xobs, uniquex = np.unique(tmpxobs, return_index=True) yobs = yobs[uniquex] ### 4. get the corresponding fluxes: fobs, efobs, skyobs, esky = Masphot.GetPhot(tim, xobs, yobs, apr=4., skyrad=[6, 21]) ## and remove the nul values: xobs = xobs[fobs.nonzero()[0]] yobs = yobs[fobs.nonzero()[0]] fobs = fobs[fobs.nonzero()[0]] ### 5. Convert those in magnitude: mobs = -2.5*np.log10(fobs) ##### ----------- PREPARATIONS ------------- ##### ##### for the loop to find matching vortices ##### nomatch = True iters = 0 steps = (min([self.xfov, self.yfov]) * rad2deg)/3 validated = np.array([]) while nomatch and iters < Niter: ### Initial search with a reduced amount of stars validated0 = self.search_pointing(alt, az, vmag, xobs, yobs, mobs, \ full_output=False, dtol=dtol, mtol=mtol) if validated0.any(): validated, inCCD, bpre, bobs = self.search_pointing(alt, az, vmag, \ xobs, yobs, mobs, full_output=True, dtol=dtol, mtol=mtol) if validated.size > validated0.size: nomatch=False else: self.Azo = (self.Azo + steps)%360 if self.Azo <=steps: self.Alto = ((self.Alto+10)%90.)*(self.Alto+10<= 90.) + 20*(1-(self.Alto+10 <=90.)) iters+=1 print 'Iterations {}, altitude {}, azimuth {}'.format(iters, self.Alto, self.Azo) else: if blind: self.Azo = np.random.randint(0, 360) self.Alto = np.random.randint(30,85) else: self.Azo = (self.Azo + steps)%360 if self.Azo <=steps: self.Alto = ((self.Alto+10)%90.)*(self.Alto+10<= 90.) + 20*(1-(self.Alto+10 <=90.)) iters+=1 print 'Iterations {}, altitude {}, azimuth {}'.format(iters, self.Alto, self.Azo) if validated.any(): from lmfit import Parameters,minimize from Masconst import rad2deg rxobs, ryobs, rmobs = xobs[bobs], yobs[bobs], mobs[bobs] tmpalt, tmpaz, tmpmag = alt[inCCD], az[inCCD], vmag[inCCD] tmpalt, tmpaz, tmpmag = tmpalt[bpre], tmpaz[bpre], tmpmag[bpre] pialt, piaz = tmpalt[validated[:,:,1]], tmpaz[validated[:,:,1]] pialt, piaz = np.ravel(pialt), np.ravel(piaz) pixobs, piyobs = np.ravel(rxobs[validated[:,:,0]]), np.ravel(ryobs[validated[:,:,0]]) params = Parameters() params.add('alto', value=self.Alto, min = rad2deg*self.yfov+10) params.add('azo', value=self.Azo) if self.Alto > 85.: params.add('tho', value=self.Tho, vary = False) else: params.add('tho', value=self.Tho, vary=True) params.add('xo', value=self.Xo, vary=False) params.add('yo', value=self.Yo, vary=False) minimize(self.__pointing, params, args=(pixobs, piyobs, pialt, piaz)) stable = self.refine_pointing(rxobs, ryobs, rmobs, tmpalt, tmpaz, tmpmag) if not stable: print 'The current pointing is not stable. Probably not a good one' print 'Give in another set of estimated altitude/azimuth' else: print " Program reached the maximum number of iterations, but couldn't find a pointing. \n" print " You can try again with a larger number of iteration, or change the first pointing estimate?" def search_pointing(self, alt, az, vmag, xobs, yobs, mobs, full_output=False, dtol=2.5, mtol=0.12): ''' Search_pointing tries to match triplets of stars from catalog (alt, az inputs) to observed ones (xobs, yobs) together. The respective triplets are compared to each other from a distance and a magnitude perspective. Inputs: - alt, az, the horizontal coordinates of the visible stars - vmag, the Vmagnitude of those visible stars - xobs, yobs, the CCD positions of the observed stars - mobs, the evaluated magnitude of those observed stars Keywords: - full_output, if True, uses the whole range of visible stars. Else it will restrain itself to the brightest stars. - dtol, the distance tolerance in pixel. It defines the allowed difference in the distance between 2 stars from a triplet - mtol, this defines the magnitude ratio between stars from a triplet ''' from Masfunct import percentile, match_vortex ### Now prepare the catalog data: xpre, ypre, inCCD = self.VisibleStars(alt, az, margin=20) mpre = vmag[inCCD] if not full_output: ### Best way of matching the magnitudes? ### Choose the 15-20% brightest magnitudes in both cases, starting with the catalog: bpre = mpre < percentile(mpre, 0.15) rxpre, rypre, rmpre = xpre[bpre], ypre[bpre], mpre[bpre] adjustsizes = 0. bobs = mobs < percentile(mobs, 0.05) rxobs, ryobs, rmobs = xobs[bobs], yobs[bobs], mobs[bobs] while rxobs.size < rxpre.size : bobs = mobs < percentile(mobs, 0.05+adjustsizes) rxobs, ryobs, rmobs = xobs[bobs], yobs[bobs], mobs[bobs] adjustsizes += 0.01 validated = match_vortex(rxobs, ryobs, rmobs, rxpre, rypre, rmpre, dtol=dtol, mtol=mtol) return validated else: ### Best way of matching the magnitudes? ### Choose the 15-20% brightest magnitudes in both cases, starting with the catalog: bpre = mpre < percentile(mpre, 0.5) rxpre, rypre, rmpre = xpre[bpre], ypre[bpre], mpre[bpre] adjustsizes = 0. bobs = mobs < percentile(mobs, 0.05) rxobs, ryobs, rmobs = xobs[bobs], yobs[bobs], mobs[bobs] while rxobs.size < rxpre.size : bobs = mobs < percentile(mobs, 0.05+adjustsizes) rxobs, ryobs, rmobs = xobs[bobs], yobs[bobs], mobs[bobs] adjustsizes += 0.01 validated = match_vortex(rxobs, ryobs, rmobs, rxpre, rypre, rmpre, dtol=dtol, mtol=mtol) return validated, inCCD, bpre, bobs def refine_pointing(self, xobs, yobs, mobs, alt, az, vmag, dtol=2.5, mtol=0.2): ''' refine_pointing verifies the pointing of the camera. Compared to find_pointing, this function uses more stars and doesn't loop. Either the pointing was more or less correct beforehand and it is here refined. Or it was bad to start with and nothing can be done... ''' from Masfunct import match_vortex pointing0 = self.__sum_pointing() + 10 Niter = 0 dpointing = 10 stable = False while dpointing > .1 and Niter < 15: xpre, ypre, inCCD = self.VisibleStars(alt, az, margin=20) validated = match_vortex(xobs, yobs, mobs, xpre, ypre, vmag[inCCD], dtol=dtol, mtol=mtol) Niter+=1 if validated.any(): from lmfit import Parameters,minimize from Masconst import rad2deg tmpalt, tmpaz = alt[inCCD], az[inCCD] pialt, piaz = tmpalt[validated[:,:,1]], tmpaz[validated[:,:,1]] pialt, piaz = np.ravel(pialt), np.ravel(piaz) pixobs, piyobs = np.ravel(xobs[validated[:,:,0]]), np.ravel(yobs[validated[:,:,0]]) params = Parameters() params.add('alto', value=self.Alto, min = rad2deg*self.yfov+10) params.add('azo', value=self.Azo) if self.Alto >85.: params.add('tho', value=self.Tho, vary=False) else: params.add('tho', value=self.Tho, vary=True) params.add('xo', value=self.Xo, vary=False) params.add('yo', value=self.Yo, vary=False) minimize(self.__pointing, params, args=(pixobs, piyobs, pialt, piaz)) self.__adjust_altitude() dpointing = np.abs(pointing0 - self.__sum_pointing()) pointing0 = self.__sum_pointing() print 'Stability of the pointing after {1} iterations {0}'.format(dpointing, Niter) if dpointing < 1.: stable=True return stable def __sum_pointing(self): ''' computes the sum of the Pointing angles ''' return np.sum(np.array([self.Alto, self.Azo, self.Tho])) def sky_view(self, plot=False): ''' returns an array (180, 360) representing the sky area seen by the camera The routine is usefull to calculate the overlap between several cameras, or the influence of pointing on the sky view ''' nx = self.nx//20 *20 ny = self.ny//20 *20 X, Y = np.meshgrid(np.arange(0,nx, 10), np.arange(0,ny,10)) sky_view = np.zeros((180,360)) for j in range(ny/10): tx = X[j,:] ty = np.ones(nx/10)*j*10 xalt, xaz = self.ProjectionOnSky(tx, ty) for t,z in zip(xalt, xaz): if t < 0: sky_view[int(t)-90,int(z)]=1 else: sky_view[int(t)+90,int(z)]=1 if plot: import matplotlib.pyplot as pl import matplotlib.cm as cm pl.imshow(sky_view, origin='lower', cmap = cm.Greys_r, extent=[0,360, 0,90], aspect='auto') pl.xlabel('Azimuth in degrees') pl.ylabel('Altitude in degrees') pl.title('Field of view of the camera') pl.show() return sky_view def __pointing(self, params, xobs, yobs, alt, az): ''' calculates the residuals between the predicted stellar positions and the observed positions. Inputs: - p, a tuple or list containing the Camera parameters. - xobs, yobs, the coordinates of the observed stars on the CCD - xpre, ypre, the predicted positions Output: - the distance between the observed and the predicted positions ''' self.Alto = params["alto"].value self.Azo = params["azo"].value % 360 ### correct the altitude if the value is above 90deg. self.__adjust_altitude() self.Tho = params["tho"].value % 360 if len(params) > 3: self.Xo = params["xo"].value self.Yo = params["yo"].value xpre, ypre, inCCD = self.VisibleStars(alt, az, local=False, solve=True) if xpre.size != xobs.size: return np.ones(xobs.size)*100 dist= (xpre-xobs)**2+(ypre-yobs)**2 return dist def solveDistortions(self, *args, **kwargs):
[docs] ''' solve the equations to modelise the lens distortions using tnx approach ''' from numpy.linalg import lstsq from scipy.optimize import leastsq from Masfunct import findOutliers ## Get the input parameters: if len(args) == 4: xpix_, ypix_, xi_, yi_ = args else: raise TypeError('solveDistortion accepts 4 parameters (given %g)'%len(args)) ## First test: verify that the 2 sets of coordinates have the same dimensions: if xpix_.size != xi_.size: raise " The two coordinates don't have the same dimension" return ## Get the polynomial orders / solution from the instance if they are defined NewSolution = False xterm = kwargs.get('xterm', 2) if self.tnx['xterm'] != xterm: self.tnx['xterm'] = xterm NewSolution = True xorder = kwargs.get('xorder', 7) if self.tnx['xorder'] != xorder: self.tnx['xorder'] = xorder NewSolution=True yorder = kwargs.get('yorder', 6) if self.tnx['yorder'] != yorder: self.tnx['yorder'] = yorder NewSolution=True niter = kwargs.get('niter', 3) ## Sort the arrays for later when finding outliers in the residuals nsize = nsize0 = xpix_.size xsort = np.argsort(xpix_) xpix = xpix_[xsort] ypix = ypix_[xsort] xi = xi_[xsort] yi = yi_[xsort] for itr in range(niter): nsize0 = nsize ## Solve the linear equations in xi: M = np.array([np.ones(xpix.size), xpix, ypix]).T xCoefs = lstsq(M, xi)[0] ## Solve the linear equations in yi: yCoefs = lstsq(M, yi)[0] self.tnx['crpix'][1] = (yCoefs[0]-yCoefs[1]*xCoefs[0]/xCoefs[1]) / \ (yCoefs[1]/xCoefs[1] * xCoefs[2]-yCoefs[2]) self.tnx['crpix'][0] = (xCoefs[0] + xCoefs[2]*self.tnx['crpix'][1])/(-xCoefs[1]) if xterm == 0: nparams = xorder+yorder elif xterm == 1: nparams = xorder*yorder elif xterm == 2: maxxt = max(xorder, yorder) -1 m, n = np.meshgrid(np.arange(xorder), np.arange(yorder)) mn = m+n nparams = (mn <= maxxt).nonzero()[0].size print 'Parameters to fit : ', nparams self.tnx['cd'][0,0] = xCoefs[1] self.tnx['cd'][0,1] = xCoefs[2] self.tnx['cd'][1,0] = yCoefs[1] self.tnx['cd'][1,1] = yCoefs[2] if (isinstance(self.tnx['coefx'], int)) or NewSolution: params = np.zeros(nparams) else: params = self.tnx['coefx'] resultx = leastsq(self.__solve_xitnx, params, args=(xi, yi, xpix)) self.tnx['coefx'] = resultx[0] if (isinstance(self.tnx['coefy'], int)) or NewSolution: params = np.zeros(nparams) else: params = self.tnx['coefy'] resulty = leastsq(self.__solve_yitnx, params, args=(xi, yi, ypix)) self.tnx['coefy'] = resulty[0] ximodel = self.distortX_tnx(xi, yi) yimodel = self.distortY_tnx(xi, yi) ## Calculating residuals and rejecting outliers for next iteration: xires = (xpix - ximodel) yires = (ypix - yimodel) if itr < niter: gdx = findOutliers(xires, bins=3, sig=2.5) #print 'Good values', gdx.nonzero()[0] xpix = xpix[gdx] ypix = ypix[gdx] xi = xi[gdx] yi = yi[gdx] yires = yires[gdx] gdy = findOutliers(yires, bins=3, sig=2.5) xpix = xpix[gdy] ypix = ypix[gdy] xi = xi[gdy] yi = yi[gdy] nsize = xpix.size if nsize < nparams +5: print 'too much rejected' break if nsize == nsize0: break return xires, yires def __solve_xitnx(self, params, xi, yi, xpix):
''' function to be called using leastsq ''' ximodel = self.distortX_tnx(params, xi, yi) E = ximodel - xpix return E def __solve_yitnx(self, params, xi, yi, ypix): '''same as above''' yimodel = self.distortY_tnx(params, xi, yi) E = yimodel-ypix return E def CorrectPositions(self, *args, **kwargs):
[docs] ''' to correct for small distortions occuring between 2 different images Inputs: - xpre, ypre, the predicted positions of the stars on the CCD Outputs: - xcor, ycor, the corrected positions taking into account small first order distortions ''' xi, yi = args solve=kwargs.get('solve', False) noCorrection=kwargs.get('noCorrection', False) order = kwargs.get('order', 1) if solve or noCorrection: return np.zeros(xi.size), np.zeros(yi.size) xcor = self.correctX(xi, yi, order=order) ycor = self.correctY(xi, yi, order=order) return xcor, ycor def correctX(self, *args, **kwargs):
[docs] ''' TO apply a small correction to the X and Y positions in addition to the global variations. Inputs: - xpre, ypre, *parameters( optional) Output: - dx, the small value to add to x to get the correct position ''' if len(args) ==3: params, xi, yi = args elif len(args) == 2: xi, yi = args params = self.Correction['xCor'] order = self.Correction['orderCor'] X = np.vstack((xi, yi)) dX = params[0] Coefs = params[1:] Coefs.shape = (2,order) for i in range(1,order+1): dX += np.dot(Coefs.T, X**i) xcorr = np.ravel(dX) return xcorr def correctY(self, *args, **kwargs):
[docs] ''' To apply small Y corrections to the predicted y position Inputs: - xpre, ypre, *parameters (optional) Output: - dy the small value to add to y to get the good positions ''' if len(args) == 3: params, xi, yi = args elif len(args)==2: xi, yi = args params = self.Correction['yCor'] order = self.Correction['orderCor'] X = np.vstack((xi,yi)) dY = params[0] Coefs = params[1:] Coefs.shape=(2,order) for i in range(1,order+1): dY += np.dot(Coefs.T, X**i) ycorr = np.ravel(dY) return ycorr def solveCorrections(self, *args, **kwargs):
[docs] ''' to calculate the positional corrections to apply to the X and Y positions Inputs: - image, - alt, az the horizontal cooredinates of the visible stars Keywords: - order, the order of the polynomial correction - debug, if True write down to file the new dx, dy and dist ''' from numpy.linalg import lstsq if len(args) ==3: im, alt, az = args Logfile = False elif len(args) ==4: import multiprocessing import logging im, alt, az, qlog = args logger = logging.getLogger(multiprocessing.current_process().name) logger.setLevel(logging.INFO) Logfile=True order = kwargs.get('order', 1) debug = kwargs.get('debug', False) quiet = kwargs.get('quiet',False) xi, yi, inCCD = self.VisibleStars(alt, az, margin=-20) xobs, yobs, mf, dist_ = self.evaluateAstrometry(im, xi, yi, rad=15) if np.median(dist_) > 2.: highoffset = True else: highoffset=False rate = float(xobs.size)/xi.size self.rate = rate if not quiet: print 'Detection rate : {}'.format(rate) if rate < 0.5: if Logfile: logger.warning('Detection rate= {}, not enough stars => Clouds'.format(rate)) print 'Not enough stars!!! Clouds' return xi = xi[mf] yi = yi[mf] if self.Correction != order: self.Correction['orderCor']=order if order==1: M = np.array([np.ones(xobs.size), xi, yi]).T elif order==2: M = np.array([np.ones(xobs.size), xi, yi, xi**2, yi**2]).T resultX = lstsq(M, xobs) resultY = lstsq(M, yobs) #params = np.zeros((self.Correction['orderCor']+1, 2)) #print params.shape #resultX = leastsq(self.solve_CorrectX, params, args=(xi, yi, xobs)) self.Correction['xCor'] = resultX[0] #resultY = leastsq(self.solve_CorrectY, params, args=(xi, yi, yobs)) self.Correction['yCor'] = resultY[0] if debug: ### prerare a file with the dx and dy position changes... xpre, ypre = self.CorrectPositions(xi, yi) dx = xpre-xobs dy = ypre-yobs dist = np.sqrt(dx**2+dy**2) rmsr = np.sqrt(np.sum(dist**2)/dist.size) rmsx = np.sqrt(np.sum(dx**2)/dx.size) rmsy = np.sqrt(np.sum(dy**2)/dy.size) with open('correctionlog.log', 'a+') as f: line = '{0:1.5f}, {1:1.5f}, {2:1.5f}, {3:1.5f}, {4:1.5f}'.format( \ np.median(dx), np.median(dy), np.median(dist), rmsx, rmsy, rmsr) f.write(line + '\n') return highoffset def invertCorrection(self, xpre, ypre):
[docs] ''' To get back from CCD position to sky ''' from numpy.linalg import inv X = np.array([xpre, ypre]) constX = self.Correction['xCor'][0] constY = self.Correction['yCor'][0] M = np.vstack((self.Correction['xCor'][1:], self.Correction['yCor'][1:])) MB = inv(M) #print Const Xi = np.dot(MB, (X)) xi = Xi[0] - constX yi = Xi[1] - constY return xi, yi def evaluateAstrometry(self, im, xpre, ypre, rad=15, hmin=4.):
[docs] ''' To evaluate how good is the astrometric solution evaluate astrometry uses the inputs coordinates (xpre, ypre) to search within a box of side radx2 any star above hmin x the standard deviation inside that box. Inputs: - im, the reference image for the astrometry - xpre, ypre, the stars positions in the CCD coordinates calculated via the astrometry Keywords: - rad, the size of the box around those coordinates. If rad is too small, the star may not be found. If rad is too big, outside stars may be found. Default is 15 - hmin, threshold for the find routine to detect a pick above the background. Default is 4. By setting hmin smaller one may detect more stars, but the routine also becomes slower. Outputs: - xc, yc, the coordinates of the stars found nearby of the inputs coordinates. xc, yc may be of smaller dimension than xpre, ypre if the astrometry is very bad, or the star too diformed. - matchit, an index array to adapt xpre,ypre to the dimension of xc, yc - mdist, the median distance between the prediction stellar position and the found stellar position. A rejection of outliers is applied before returning. ''' from Masphot import find from Masfunct import findOutliers from scipy.ndimage import filters #from scipy.ndimage import center_of_mass instars = xpre.shape[0] ny, nx = im.shape xc = np.zeros(instars, dtype=np.float64) yc = np.zeros(instars, dtype=np.float64) dist = np.zeros(instars, dtype=np.float64)+100. for i in xrange(instars): lx = (int(xpre[i])-rad)*(int(xpre[i])-rad>0) ly = (int(ypre[i])-rad)*(int(ypre[i])-rad>0) upx = ((int(xpre[i])+rad)%nx)*(int(xpre[i])+rad<nx) + \ (nx*(1-(int(xpre[i])+rad<nx))) upy = ((int(ypre[i])+rad)%ny)*(int(ypre[i])+rad<ny) + \ (ny*(1-(int(ypre[i])+rad<ny))) boxed = im[ly:upy, lx:upx] #print xpre[i], ypre[i] #xm, ym, s,r = find(gboxed, 5.*gboxed.std(), 4.) xm = ym = np.array([]) gboxed = filters.gaussian_filter(boxed, 3., order=0) for fwhm in [4.5, 5, 5.5]: xf, yf, s,r = find(gboxed-np.median(gboxed), bt.nanstd(gboxed), fwhm) if xf is not None: xm = np.hstack((xm, xf)) ym = np.hstack((ym, yf)) xm, uniqueindex = np.unique(xm, return_index=True) ym = ym[uniqueindex] #if xm is not None: Nm = len(xm) #print 'Found %g stars' %Nm if Nm > 0: if Nm >=2: #print xm, ym xtmp = xm+int(xpre[i])-rad ytmp = ym+int(ypre[i])-rad tmp = [im[round(l),round(k)] for k,l in zip(xtmp,ytmp) \ if k < self.nx-1 if l < self.ny-1] #print tmp ti = np.argmax(tmp) xm = xm[ti] ym = ym[ti] xc[i], yc[i] = xm+int(xpre[i])-rad, ym+int(ypre[i])-rad dist[i] = np.sqrt((yc[i]-ypre[i])**2+(xc[i]-xpre[i])**2) matchit = xc.nonzero()[0] xc, yc = xc[matchit], yc[matchit] dist = dist[matchit] ## Quick selection of the good and bad matched stars? dx = xpre[matchit] - xc dy = ypre[matchit] - yc #print 'Found %g stars, starting rejection ...' %dist.size #return xc, yc, dx, dy ## Select optimized bin values? binsize = dist.size/40 + 1 goodvalx = findOutliers(dx, bins = binsize, sig=2.5) xc = xc[goodvalx] yc = yc[goodvalx] dy = dy[goodvalx] matchit = matchit[goodvalx] dist = dist[goodvalx] goodvaly = findOutliers(dy, bins=binsize, sig=2.5) yc = yc[goodvaly] xc = xc[goodvaly] matchit = matchit[goodvaly] dist = dist[goodvaly] return xc,yc, matchit, dist def logAstrometryCheck(self, timejd, lstid, imageshape, out=False, fileroot='', logdir='' ):
[docs] ''' astrocheck calculates the alt-az positions of 5 points on the CCD at the given time. The points are respectively, center of CCD, 1/4 left top corner, 1/4 right top corner, 1/4 bottom left corner and 1/4 bottom right corner. Inputs: - Cam, a camera instance - timejd, the time when the image is taken in JD - lstid, the lstindex of the image - imageshape, a tuple containing the ny, nx dimension of the image Keywords: - out, if True, returns 2 arrays for alt and az. - filename, name of the file which is written and updated to disk - logdir, the directory path to save the file The resulting alt and az are written to file. ''' import os from Masfunct import jd2calendar from datetime import timedelta ny, nx = imageshape upy = ny*7/8 upx = nx*7/8 lwx = nx/8 lwy = ny/8 xpos = np.array([lwx, lwx, nx/2, upx, upx]) ypos = np.array([lwy, upy, ny/2, lwy, upy]) xi, yi = self.invertCorrection(xpos, ypos) alt, az = self.ProjectionOnSky(xi, yi) talt, taz = np.round(alt), np.round(az) xi, yi, init = self.VisibleStars(talt, taz) xc, yc = self.CorrectPositions(xi,yi) datestr = (jd2calendar(timejd)-timedelta(hours=12.)).strftime('%Y%m%d') fileroot = fileroot+self.sname self.fileroot = logdir filename = os.path.join(logdir, fileroot+datestr+'.txt') self.logFileName = filename message = '' if not os.path.exists(filename): message = ('### JD, LSTid, altbottomleft, alttopleft, altcentre, altbottomright,' \ +' alttopright, azbottomleft, aztopleft, azcentre, azbottomright,' \ +' aztopright, x bottom left, x top left, x center, x bottom right,' \ +' x top right, y bottom left, y top left, y center, y bottom right,' \ +' y top right, rate \n') message += ('{0:1.6f}, {1}, {2:1.4f}, {3:1.4f}, {4:1.4f}, {5:1.4f}, {6:1.4f}, ' \ +'{7:1.4f}, {8:1.4f}, {9:1.4f}, {10:1.4f}, {11:1.4f}, {12:1.4f}, ' \ +'{13:1.4f}, {14:1.4f}, {15:1.4f}, {16:1.4f}, {17:1.4f}, {18:1.4f}, '\ +'{19:1.4f}, {20:1.4f}, {21:1.4f}, {22:1.2f}').format(timejd, lstid, alt[0], \ alt[1], alt[2], alt[3], alt[4], az[0], az[1], az[2], az[3], \ az[4], xc[0], xc[1], xc[2], xc[3], xc[4], yc[0], yc[1], \ yc[2], yc[3], yc[4], self.rate) ## write to file: with open(filename, 'a') as f: f.write(message + '\n') if out: return np.array(alt), np.array(az) def readAstroCheck(self, filename='', make_plot = True):
[docs] ''' readAstroCheck reads in the astrocheck file and returns the entries in an array ''' import os if not filename: try: filename = self.logFileName except AttributeError: return thisnight = filename.split(self.sname)[-1] thisnight = thisnight.split('.')[0] if os.path.exists(filename): with open(filename, 'rb') as f: c = f.read() else: return lines = c.split('\n') lines = [l for l in lines if not l.startswith('###') if not l.strip()==''] dtypes = [('jd',np.float64),('lstid', np.uint32), ('altbl', float), ('alttl', float), \ ('altc', float), ('altbr', float), ('alttr', float), ('azbl', float), \ ('aztl', float), ('azc', float), ('azbr', float), ('aztr', float), \ ('xbl',float), ('xtl',float),('xc',float),('xbr',float),('xtr',float),\ ('ybl',float), ('ytl',float),('yc',float),('ybr',float),('ytr',float), \ ('rate', float)] lists = [ [] for n in dtypes] for line in lines: bla = line.split(',') for i, e in enumerate(bla): lists[i].append(dtypes[i][1](e)) check = np.rec.fromarrays(lists, dtype=dtypes) if make_plot: import matplotlib.pyplot as pl pl.figure(1, figsize = (12, 8)) ax00 = pl.subplot(2,2,1) pos1 = ax00.get_position() pos2 = [pos1.x0-0.02, pos1.y0, pos1.width/1.2, pos1.height/1.1] ax00.set_position(pos2) labels = ['bottom left', 'top left', 'centre', 'bottom right', 'top right'] names = check.dtype.names for i, j in enumerate(names[2:7]): pl.plot(check['jd'], (check[j]-np.median(check[j]))*60., '+:', label=labels[i]) pl.xlabel('JD') pl.ylabel('Arcmin') pl.title('Altitude variations') ax01 = pl.subplot(2,2,2) pos1 = ax01.get_position() pos2 = [pos1.x0-0.03, pos1.y0, pos1.width/1.2, pos1.height/1.1] ax01.set_position(pos2) for i, j in enumerate(names[7:12]): pl.plot(check['jd'], (check[j]-np.median(check[j]))*60., '+:', label=labels[i]) pl.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) pl.xlabel('JD') pl.ylabel('Arcmin') pl.title('Azimuth variations') ax10 = pl.subplot(2,2,3) pos1 = ax10.get_position() pos2 = [pos1.x0-0.02, pos1.y0, pos1.width/1.2, pos1.height/1.1] ax10.set_position(pos2) for i, j in enumerate(names[12:17]): pl.plot(check['jd'], (check[j]-np.median(check[j])), '+:', label=labels[i]) #pl.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) pl.xlabel('JD') pl.ylabel('Pixel') pl.title('X position variations') ax11 = pl.subplot(2,2,4) pos1 = ax11.get_position() pos2 = [pos1.x0-0.03, pos1.y0, pos1.width/1.2, pos1.height/1.1] ax11.set_position(pos2) for i, j in enumerate(names[17:22]): pl.plot(check['jd'], (check[j]-np.median(check[j])), '+:', label=labels[i]) pl.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) pl.xlabel('JD') pl.ylabel('Pixel') pl.title('Y position variations') pl.suptitle('Astrometry over the night {} for {}'.format(thisnight, self.name)) pl.savefig(self.fileroot+'AstroCheck.png', dpi=300, facecolor='w', \ orientation='landscape', papertype='A5') pl.close() else: return check def reload(self, backup = False):
[docs] ''' Reloads the camera using the infos from default file ''' if backup: self.getcaminfo(self.filename, backup=True) self.tnx['cd'].shape = (2,2) else: self.getcaminfo(self.filename) self.tnx['cd'].shape = (2,2) return def getcaminfo(self, camname, filename='', backup = False):
[docs] ''' Extract from file the parameters defining the camera. caminfo is an unique file containing: - the pointing vector of the camera (alto, azo) - the orientation of the camera on sky (tho) - the center of the lens on the CCD (Xo, Yo) - the focal length of the lens (focal in mm) - the number of pixel in x and y (nx, ny) - the pixel size ''' import os ### 1. Get the camera name / file name: if filename: self.filename = filename else: temp = camname.split('.dat') if len(temp) ==1: camname = camname + '.dat' self.filename = camname ### 2. Verify if the file exists: if not os.path.exists(self.filename): print 'The camera file does not exist! ' ### 2.1 Verify if backup file exists backcamname = self.filename.replace('.', '_backup.') if not os.path.exists(backcamname): print 'Backup file also does not exists' return else: with open(backcamname, 'rb') as f: c = f.read() else: if backup: backcamname = self.filename.replace('.', '_backup.') with open(backcamname, 'rb') as f: c = f.read() else: with open(self.filename, 'rb') as f: c = f.read() ### 3. Process the file caminfo = c.split('\n') lines = [l for l in caminfo if not l.strip()==''] entries = [] values = [] for line in lines: entrie, value = line.split(' = ') entries.append(entrie.strip()) value = value.split('##')[0].strip() ### Verify if the entry has one or more values: subvalues = value.split(',') if len(subvalues)==1: ## is it a string entry? if value.isalpha(): values.append(value) else: values.append(float(value)) else: temp = value.split(',') values.append(np.array(temp, dtype=float)) entries = np.array(entries, dtype=str) self.name = values[(entries=='name').nonzero()[0]] self.sname = values[(entries=='sname').nonzero()[0]] self.Alto = values[(entries=='alt').nonzero()[0]] self.Azo = values[(entries=='az').nonzero()[0]] self.Tho = values[(entries=='tho').nonzero()[0]] self.Xo = values[(entries=='xo').nonzero()[0]] self.Yo = values[(entries=='yo').nonzero()[0]] self.nx = values[(entries=='nx').nonzero()[0]] self.ny = values[(entries=='ny').nonzero()[0]] self.pixsize = values[(entries=='pixsize').nonzero()[0]] self.focal = values[(entries=='focal').nonzero()[0]] self.delta = values[(entries=='delta').nonzero()[0]] if len(entries) > 12: self.tnx={} for k, v in zip(entries[11:], values[11:]): self.tnx[k]=v self.tnx['xorder'] = int(self.tnx['xorder']) self.tnx['yorder'] = int(self.tnx['yorder']) self.tnx['xterm'] = int(self.tnx['xterm']) else: CD = np.zeros((2,2)) CD[0,0] = CD[1,1] = 1. CRPIX = np.zeros(2) self.tnx = {'cd':CD, 'coefx':0, 'coefy':0, \ 'xterm':0, 'crpix':CRPIX, 'xfit':0, 'yfit':0} return def writeCameraFile(self, **kwargs):
''' to write to file all the parameters which define the camera at a given moment. Input: the filename ''' filename = kwargs.get('filename', '') overwrite = kwargs.get('overwrite', False) backup = kwargs.get('backup', False) if filename =='': if self.filename: filename = self.filename else: filename = self.name + '.dat' ### Verify if the file exists import os if os.path.exists(filename): print 'The file already exists!!' if overwrite: os.remove(filename) ### Preparing the entries: entries = ['name', 'sname', 'alt', 'az', 'tho', 'xo', 'yo', 'nx', 'ny', \ 'pixsize', 'focal', 'xorder', 'yorder', 'xterm', \ 'crpix', 'cd', 'coefx', 'coefy', 'xfit', 'yfit', 'delta'] values = [self.name, self.sname, self.Alto, self.Azo, self.Tho, self.Xo, \ self.Yo, self.nx, self.ny, self.pixsize, self.focal, \ self.tnx['xorder'], self.tnx['yorder'], self.tnx['xterm']] values.append(self.tnx['crpix'].tolist()) values.append(self.tnx['cd'].tolist()) values.append(self.tnx['coefx'].tolist()) values.append(self.tnx['coefy'].tolist()) values.append(self.tnx['xfit'].tolist()) values.append(self.tnx['yfit'].tolist()) values.append(self.delta ) comments=['Camera name', 'Camera Identifier', 'Pointing Altitude', \ 'Pointing Azimuth', 'Camera orientation', 'X centre of lens', \ 'Y centre of lens', 'Number of X pixels', 'Number of Y pixels', \ 'Pixel size in mm', 'focal length of lens in mm', \ 'Polynomial order for the X distortions', \ 'Polynomial order for the Y distortions', \ 'TNX polynomial solution (0=linear, 1=full cross-terms, 2=half cross-terms)', \ 'CRPIX ', 'CD', 'Polynomial Coefficients for the TNX distortions in X', \ 'Polynomial Coefficients for the TNX distortions in Y', \ 'Polynomial coefficients for the remaining loworder aberrations in X', \ 'Polynomial Coefficients for the remaining loworder aberrations in Y', \ 'RMS distance from a star'] for e, val, comm in zip(entries, values, comments): v = '{}'.format(val) v = v.replace('[','') v = v.replace(']','') line = '{:<7} = {:<20} ## {} \n'.format(e, v, comm) with open(filename, 'a+') as f: f.write(line) if backup: import shutil backupname = filename.split('.dat')[0] shutil.copyfile(filename, backupname+'_backup.dat') return def inAstrometricDataBase(self, path = ''): ''' inAstrometricDataBase puts the latest astrometric solution into a database like table to be read. ''' from datetime import datetime import os import pandas as pd ## 1. Prepare the inputs: solocam = {} solocam['xo'] = np.array([self.Xo]) solocam['yo'] = np.array([self.Yo]) solocam['alt'] = np.array([self.Alto]) solocam['az'] = np.array([self.Azo]) solocam['tho'] = np.array([self.Tho]) for k, v in self.tnx.iteritems(): solocam[k] = v ## 2. Does the database already exists? if os.path.exists(path + 'AstrometricDB.h5'): data = pd.read_hdf(path+'AstrometricDB.h5', 'astroDB') names = data.keys() # a. If the camera already has an entry if self.name in names: tmp = data[self.name] tmp[datetime.now().strftime('%Y%m%d %H:%M')] = solocam # b. create entry if not: else: tmp = {} tmp[datetime.now().strftime('%Y%m%d %H:%M')] = solocam data[self.name] = tmp data.to_hdf(path+'AstrometricBD.h5', 'astroDB') ## 3. Create if it doesn't exist else: tmp = {} tmp[datetime.now().strftime('%Y%m%d %H:%M')] = solocam data = {} data[self.name] = tmp data.to_hdf(path+'AstrometricBD.h5', 'astroDB') return