Source code for data.processing

import numpy as np
import ehtim as eh
import os
import matplotlib.pyplot as plt

import ngEHTforecast.fisher.fisher_forecast as ff

[docs]def preprocess_obsdata(obs,ff=None,p=None,snr_cut=None,sys_err=None,const_err=None,avg_time=None,verbosity=0,**kwargs) : """ Applies default preprocessing and creates and returns a new :class:`ehtim.obsdata.Obsdata` object. Flagging and averaging is performed using ehtim functinality. See the source code for the order in which operations are applied. Args: obs (ehtim.obsdata.Obsdata,str): An ehtim Obsdata object containing the desired observing profile or a uvfits file name to read. ff (fisher.fisher_forecast.FisherForecast): A :class:`fisher.fisher_forecast.FisherForecast` child class describing the model for which we wish to apply the processing. Required for applying the SNR cut. Default: None. p (list): Parameter list appropriate for ff. Requires ff to be specified. Default: None. snr_cut (float): SNR below which to exclude baselines. A proxy for a detectiton threshold. Rquires ff and p to be specified. Default: None. sys_err (float): Percent fractional systematic error to add. A proxy for non-closing errors. Default: None. const_err (float): A constant fractional error in mJy to add in quadrature. Default: None. avg_time (float,str): Coherently average data set. If a float, specifies the averaging time in seconds. If 'scan', will create and average on scans. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (ehtim.obsdata.Obsdata): A new ehtime Obsdata object with the desired processing applied. """ if (isinstance(obs,str)) : _,ext = os.path.splitext(obs) if (ext.lower()=='.uvfits') : obs = eh.obsdata.load_uvfits(obs) else : raise(RuntimeError("Only UVFITS files can be read at this time.")) obs_new = obs.copy() if (not avg_time is None) : if (avg_time=='scan') : obs_new = obs_new.avg_coherent(0,scan_avg=True) else : obs_new = obs_new.avg_coherent(avg_time) if (not ff is None ) : if (p is None) : raise(RuntimeError("Parameter list must be provided if FisherForecast object is not None.")) obs_new.data['vis'] = ff.visibilities(obs,p) if (not snr_cut is None) : if (ff is None) : # raise(Warning("SNR cut is being applied without FisherForecast object.")) print("WARNING: SNR cut is being applied without FisherForecast object.") obs_new = obs_new.flag_low_snr(snr_cut) if (not sys_err is None) : if (ff is None) : # raise(Warning("Systematic error is being applied without FisherForecast object.")) print("WARNING: Systematic error is being applied without FisherForecast object.") obs_new = obs_new.add_fractional_noise(frac_noise=0.01*sys_err) if (not const_err is None) : obs_new.data['sigma'] = np.sqrt(obs_new.data['sigma']**2 + (0.001*const_err)**2) return obs_new
[docs]def display_visibilities(obs,figsize=None,fig=None,axs=None,**kwargs) : """ Plots the visilibility amplitudes, phases, and uncertainties as a function of baseline length for a given :class:`ehtim.obsdata.Obsdata` object. Args: obs (ehtim.obsdata.Obsdata): Observation for which to plot the visibilities. figsize (tuple): Figure size in inches. fig (matplotlib.figure.Figure): Figure object on which to generate plots. axs (list): List of :class:`matplotlib.axes.Axes` objects on which to generate plots. **kwargs (dict): Key word arguments understood by errorbar and plot to be applied. Returns: (matplotlib.figure.Figure,list): Figure object and list of Axes objects as produced by :meth:`matplotlib.figure.Figure.subplots`. """ if (figsize is None) : figsize = (6,8) if (axs is None) : fig,axs = plt.subplots(nrows=3,ncols=1,figsize=figsize,sharex=True) plt.subplots_adjust(left=0.15,bottom=0.1,right=0.975,top=0.975) xmax = 0 elif (fig is None) : fig = plt.gcf() xmax = list(axs[0].get_xlim())[1] else : plt.scf(fig) xmax = list(axs[0].get_xlim())[1] if (not 'alpha' in kwargs.keys()) : kwargs['alpha'] = 0.25 if (not 'marker' in kwargs.keys()) : kwargs['marker'] = '.' if (not 'color' in kwargs.keys()) : kwargs['color'] = 'b' if ( (not 'linestyle' in kwargs.keys()) and (not 'ls' in kwargs.keys()) ) : kwargs['linestyle'] = '' uv = np.sqrt( obs.data['u']**2 + obs.data['v']**2 )/1e9 amp = np.abs(obs.data['vis']) phs = np.angle(obs.data['vis']) sig = obs.data['sigma'] xlim = (-0.1,max(xmax,1.1*np.max(uv))) axs[0].errorbar(uv,amp,yerr=sig,**kwargs) axs[0].set_xlim(xlim) axs[0].set_yscale('log') axs[0].set_ylabel(r'$|V|~({\rm Jy})$') axs[1].errorbar(uv,phs,yerr=sig/amp,**kwargs) axs[1].set_xlim(xlim) axs[1].set_ylabel(r'${\rm arg}(V)~({\rm rad})$') axs[2].plot(uv,1e3*sig,**kwargs) axs[2].set_xlim(xlim) if (np.max(sig)/np.min(sig)>10) : axs[2].set_yscale('log') axs[2].set_ylabel(r'$\sigma~({\rm mJy})$') axs[2].set_xlabel(r'$|u|~({\rm G}\lambda)$') for i in range(3) : axs[i].grid(visible=True,alpha=0.25) return fig,axs
[docs]def display_baselines(obs,figsize=None,fig=None,axes=None,**kwargs) : """ Plots the baseline map for a given :class:`ehtim.obsdata.Obsdata` object. Args: obs (ehtim.obsdata.Obsdata): Observation for which to plot the visibilities. figsize (tuple): Figure size in inches. fig (matplotlib.figure.Figure): Figure object on which to generate plot. axes (matplotlib.axes.Axes): Axes object on which to generate plot. **kwargs (dict): Key word arguments understood by errorbar and plot to be applied. Returns: (matplotlib.figure.Figure,matplotlib.axes.Axes): Figure and Axes object of plots. """ if (figsize is None) : figsize = (5,5) if (axes is None) : fig = plt.figure(figsize=figsize) axes = plt.axes([0.15,0.15,0.8,0.8]) elif (fig is None) : fig = plt.gcf() else : fig = plt.scf(fig) if (not 'alpha' in kwargs.keys()) : kwargs['alpha'] = 0.25 if (not 'marker' in kwargs.keys()) : kwargs['marker'] = '.' if (not 'color' in kwargs.keys()) : kwargs['color'] = 'b' if ( (not 'linestyle' in kwargs.keys()) and (not 'ls' in kwargs.keys()) ) : kwargs['linestyle'] = '' uvlim = 1.1*max(np.max(np.abs(obs.data['u']))/1e9,np.max(np.abs(obs.data['v']))/1e9) axes.plot(obs.data['u']/1e9,obs.data['v']/1e9,**kwargs) axes.plot(-obs.data['u']/1e9,-obs.data['v']/1e9,**kwargs) xlim = list(axes.get_xlim()) ylim = list(axes.get_ylim()) xlim[1] = min(xlim[1],-uvlim) xlim[0] = max(xlim[0],uvlim) ylim[0] = min(ylim[0],-uvlim) ylim[1] = max(ylim[1],uvlim) axes.set_xlim(xlim) axes.set_ylim(ylim) axes.set_xlabel(r'$u~({\rm G}\lambda)$') axes.set_ylabel(r'$v~({\rm G}\lambda)$') axes.grid(visible=True,alpha=0.25) return fig,axes
[docs]def display_trivial_cphases(obs,utriv=1e-2,return_data=False,print_outliers=False) : """ Plots trivial closure phases, defined by those on triangles with a side shorter than **utriv** (by default :math:`0.01\\,{\\rm G}\\lambda`), and the distribution of their normalized residuals. Optionally, will return the trivial closure phase data and/or print any outliers to the screen. Args: obs (ehtim.obsdata.Obsdata): Observation for which to plot the visibilities. utriv (float): Baseline length cutoff in :math:`{\\rm G}\\lambda`, below which a baseline will be considered "trivial". Default: 0.01. return_data (bool): If True, will return the trivial closure phases. print_outliers (bool): If True, will print the most discrepant trivial closure phases, in order of decreasing discrepancy. Returns: (np.recarray): If return_data=True, returns the closure phase data for the trivial triangles. """ utriv *= 1e9 # From Glambda to lambda # Make a local copy so that we don't change the underlying obsdata object obs = obs.copy() # Closure phases obs.add_cphase(count='max') trivial_mask = ( (np.sqrt(obs.cphase['u1']**2+obs.cphase['v1']**2)<utriv) + (np.sqrt(obs.cphase['u2']**2+obs.cphase['v2']**2)<utriv) + (np.sqrt(obs.cphase['u3']**2+obs.cphase['v3']**2)<utriv) ) cpd_trivials = obs.cphase[trivial_mask] cpd_normed = cpd_trivials['cphase']/cpd_trivials['sigmacp'] mean_cpd = np.mean(cpd_normed) std_cpd = np.std(cpd_normed) plt.figure(figsize=(6,8)) plt.axes([0.15,0.6,0.8,0.375]) plt.errorbar(cpd_trivials['time'],cpd_trivials['cphase'],yerr=cpd_trivials['sigmacp'],fmt='.',color='cornflowerblue') plt.xlabel(r'UTC (hr)') plt.ylabel(r'Trivial Closure Phase (deg)') ylim = list(plt.ylim()) plt.ylim((max(-180,ylim[0]),min(180,ylim[1]))) plt.grid(True,alpha=0.25) plt.axes([0.15,0.1,0.8,0.375]) plt.hist(cpd_trivials['cphase'],bins=33,color='cornflowerblue',density=True) plt.xlabel(r'Normalized Closure Phase') plt.ylabel(r'Frequency') plt.grid(True,alpha=0.25) # print("Cphase:",mean_cpd,std_cpd) is_outlier = (np.abs(cpd_normed)>2) cpd_trivial_outliers = cpd_trivials[is_outlier] if (print_outliers) : ilist = np.flipud(np.argsort(np.abs(cpd_trivial_outliers['cphase']/cpd_trivial_outliers['sigmacp']),)) if (len(ilist)>20) : print("%i 2-sigma outliers found. Printing only worst 20."%(len(ilist))) ilist = ilist[:20] print(cpd_trivial_outliers[ilist]) print(" ...") else : print("%i 2-sigma outliers found."%(len(ilist))) print(cpd_trivial_outliers[ilist]) print("-------------------------------------------------------------------") if (return_data) : return cpd_trivials