Source code for fisher.fisher_forecast

import numpy as np
import matplotlib.pyplot as plt
import copy
import hashlib

# import scipy.linalg as linalg
import numpy.linalg as linalg

def _print_matrix(m) :
    for i in range(m.shape[0]) :
        line = ""
        for j in range(m.shape[1]) :
            line = line + ' %10.3g'%(m[i][j])
        print(line)

def _print_vector(v) :
    line = ""
    for i in range(v.shape[0]) :
        line = line + ' %10.3g'%(v[i])
    print(line)

def _invert_matrix(a) :
    tmp = linalg.inv(a)
    return 0.5*(tmp+tmp.T)
    # return linalg.inv(a)
    # return linalg.pinvh(a)
    # n = a.shape[0]
    # I = np.identity(n)
    # return linalg.solve(a, I, sym_pos = True, overwrite_b = True)

def _vMv(v,M) :
    # return np.matmul(v.T,np.matmul(M,v))
    tmp = np.matmul(v.T,np.matmul(M,v))
    return 0.5*(tmp+tmp.T)
    
    
# Some constants
uas2rad = np.pi/180./3600e6            
rad2uas = 1.0/(uas2rad)
sig2fwhm = np.sqrt(8.0*np.log(2.0))
fwhm2sig = 1.0/sig2fwhm    
    
[docs]class FisherForecast : """ Class that collects and contains information for making Fisher-matrix type forecasts for observations of various types. Forms a base class for fully analytical versions that may be application specific. Attributes: size (int): Number of parameters expected by this model. stokes (str): If this is a Stokes I model ('I') or a polarized model ('full'). prior_sigma_list (list): List of standard deviations associated with the parameter priors. covar (numpy.ndarray): Internal space for the computation of the covariance matrix. argument_hash (str): MD5 hash object indicating last state of covariance computation. Used to determine if the covariance needs to be recomputed. """ def __init__(self) : self.size = 0 self.argument_hash = None self.inv_argument_hash = None self.prior_sigma_list = [] self.stokes = 'I' self.covar = None self.invcovar= None self.default_color_list = ['r','b','g','orange','purple'] self.default_cmap_list = ['Reds_r','Blues_r','Greens_r','Oranges_r','Purples_r']
[docs] def visibilities(self,obs,p,verbosity=0) : """ User-defined function in child classes that generates visibilities associated with a given model image object. Args: obs (ehtim.obsdata.Obsdata): An ehtim Obsdata object with a particular set of observation details (u,v positions, times, etc.) p (numpy.ndarray): list of parameters for the model image used to create object. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray): List of complex visibilities computed at observations. """ raise(RuntimeError("visibilities function not implemented in base class!")) return 0*obs.data['u']
[docs] def visibility_gradients(self,obs,p,verbosity=0,**kwargs) : """ User-defined function in child classes that generates visibility gradients associated with a given model image object. Args: obs (ehtim.obsdata.Obsdata): An ehtim Obsdata object with a particular set of observation details (u,v positions, times, etc.) p (numpy.ndarray): list of parameters for the model image used to create object. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray): List of complex visibilities gradients computed at observations. """ raise(RuntimeError("visibility_gradients function not implemented in base class!")) return 0*obs.data['u']
[docs] def parameter_labels(self,verbosity=0) : """ User-defined function in child classes that returns a list of of parameter labels to be used in plotting. Args: verbosity (int): Verbosity level. Default: 0. Returns: (list): List of strings with parameter labels. """ raise(RuntimeError("parameter_labels function not implemented in base class!")) return []
[docs] def add_gaussian_prior(self,pindex,sigma,verbosity=0) : """ Add a Gaussian prior on the parameter with the specified index. Args: pindex (int): Index of the parameter on which to place the prior. sigma (float): Standard deviation of the Gaussian prior to apply. verbosity (int): Verbosity level. Default: 0. """ if (pindex>self.size) : raise(RuntimeError("Parameter %i does not exist, expected in [0,%i]."%(pindex,self.size-1))) if (len(self.prior_sigma_list)==0) : self.prior_sigma_list = self.size*[None] self.prior_sigma_list[pindex] = sigma self.argument_hash = None
[docs] def add_gaussian_prior_list(self,sigma_list,verbosity=0) : """ Add a Gaussian priors on all of the model parameters. Args: sigma_list (list): List of standard deviations of the Gaussian priors to apply. verbosity (int): Verbosity level. Default: 0. """ self.prior_sigma_list = copy.copy(sigma_list) if (len(self.prior_sigma_list)!=self.size) : raise(RuntimeError("Priors must be specified for all parameters if set by list. If sigma is None, no prior will be applied.")) self.argument_hash = None
[docs] def generate_image(self,p,limits=None,shape=None,verbosity=0) : """ Generate and return an image for the parent visibility model evaluated at a given set of parameter values. The user is responsible for setting the plot limits and shape intelligently. Note that this uses FFTs and assumes that the resolution and field of view are sufficient to adequately resolve model image features. Args: p (list): List of parameter values. limits (float,list): Limits on the field of view in which to construct the image in uas. Either a single float, in which case the limits are symmetric and set to [-limits,limits,-limits,limits], or a list of floats. Default: 100. shape (int,list): Dimensions of the image. If an int, the dimensions are equal. If a two-element list of ints, sets the dimensions to the two values. Default: 256. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray,numpy.ndarray,numpy.ndarray): x positions of the pixel centers in uas, y positions of the pixel centers in uas, intensities at pixel centers in Jy/uas^2. """ # Fix image limits if (limits is None) : limits = [-100,100,-100,100] elif (isinstance(limits,list)) : limits = limits else : limits = [-limits, limits, -limits, limits] # Set shape if (shape is None) : shape = [256, 256] elif (isinstance(shape,list)) : shape = shape else : shape = [shape, shape] if (verbosity>0) : print("limits:",limits) print("shape:",shape) umax = 0.5*shape[0]/(uas2rad*(limits[1]-limits[0])) vmax = 0.5*shape[1]/(uas2rad*(limits[3]-limits[2])) if (verbosity>0) : print("Max (u,v) = ",(umax,vmax)) # FIX class obsempty : data = {} u,v = np.mgrid[-umax:umax:(shape[0])*1j, -vmax:vmax:(shape[1])*1j] obsgrid = obsempty() obsgrid.data['u'] = u obsgrid.data['v'] = v V = self.visibilities(obsgrid,p,verbosity=verbosity) I = np.abs(np.fft.fftshift(np.fft.fft2(V))) x1d = np.fft.fftshift(np.fft.fftfreq(I.shape[0],np.abs(u[1,1]-u[0,0]))) y1d = np.fft.fftshift(np.fft.fftfreq(I.shape[1],np.abs(v[1,1]-v[0,0]))) x,y = np.meshgrid(x1d,y1d,indexing='ij') x = x/uas2rad y = y/uas2rad obsgrid.data['u'] = 0.0 obsgrid.data['v'] = 0.0 I = I * np.abs(self.visibilities(obsgrid,p,verbosity=verbosity))/np.sum(I) / ((x[1,1]-x[0,0])*(y[1,1]-y[0,0])) if (verbosity>0) : print("V00:",self.visibilities(obsgrid,p)) print("Sum I:", np.sum(I) * ((x[1,1]-x[0,0])*(y[1,1]-y[0,0])) ) return x,y,I
[docs] def display_image(self,p,limits=None,shape=None,verbosity=0,**kwargs) : """ Generate and plot an image for the parent visibility model evaluated at a given set of parameter values. The user is responsible for setting the plot limits and shape intelligently. Note that this uses FFTs and assumes that the resolution and field of view are sufficient to adequately resolve model image features. Args: p (list): List of parameter values. limits (float,list): Limits on the field of view in which to construct the image in uas. Either a single float, in which case the limits are symmetric and set to [-limits,limits,-limits,limits], or a list of floats. Default: 100. shape (int,list): Dimensions of the image. If an int, the dimensions are equal. If a two-element list of ints, sets the dimensions to the two values. Default: 256. verbosity (int): Verbosity level. Default: 0. Returns: (matplotlib.pyplot.figure,matplotlib.pyplot.axes,matplotlib.pyplot.colorbar): Handles to the figure, axes, and colorbar. """ plt.figure(figsize=(6.5,5)) axs=plt.axes([0.15,0.15,0.8*5/6.5,0.8]) x,y,I = self.generate_image(p,limits=limits,shape=shape,verbosity=verbosity) Imax = np.max(I) if (Imax>1e-1) : fu = r'$I~({\rm Jy}/\mu{\rm as}^2)$' elif (Imax>1e-4) : fu = r'$I~({\rm mJy}/\mu{\rm as}^2)$' I = I*1e3 elif (Imax>1e-7) : fu = r'$I~({\rm \mu Jy}/\mu{\rm as}^2)$' I = I*1e6 elif (Imax>1e-10) : fu = r'$I~({\rm nJy}/\mu{\rm as}^2)$' I = I*1e9 elif (Imax>1e-13) : fu = r'$I~({\rm pJy}/\mu{\rm as}^2)$' I = I*1e12 plt.pcolormesh(x,y,I,cmap='afmhot',vmin=0,shading='auto') plt.xlabel(r'$\Delta{\rm RA}~(\mu{\rm as})$') plt.ylabel(r'$\Delta{\rm Dec}~(\mu{\rm as})$') plt.gca().invert_xaxis() cbax = plt.axes([0.8*5/6.5+0.05+0.15,0.15,0.05,0.8]) plt.colorbar(cax=cbax) cbax.set_ylabel(fu,rotation=-90,ha='center',va='bottom') plt.sca(axs) return plt.gcf(),axs,cbax
[docs] def check_gradients(self,obs,p,h=None,verbosity=0) : """ Numerically evaluates the gradients using the visibilities function and compares them to the gradients returned by the visibility_gradients function. Numerical differentiation is 2nd-order, centered finite differences. Results are output to standard out. For best results, the user should pass some option for the step size h to use for numerical differentiation. Args: obs (ehtim.obsdata.Obsdata): An ehtim Obsdata object with a particular set of observation details (u,v positions, times, etc.) at which to check gradients.. p (list): List of parameter values. h (float,list): List of small steps which define the finite differences. If None, uses 1e-4*p. If a float, uses h*p. If a list, sets each step size separately. verbosity (int): Verbosity level. Default: 0. """ if (h is None) : h = np.array(len(p)*[1e-4]) elif (not isinstance(h,list)) : h = np.array(len(p)*[h]) if self.stokes == 'I': gradV_an = self.visibility_gradients(obs,p,verbosity=verbosity) gradV_fd = [] q = np.copy(p) for i in range(self.size) : q[i] = p[i]+h[i] Vp = self.visibilities(obs,q,verbosity=verbosity) q[i] = p[i]-h[i] Vm = self.visibilities(obs,q,verbosity=verbosity) q[i] = p[i] gradV_fd.append((Vp-Vm)/(2.0*h[i])) gradV_fd = np.array(gradV_fd).T else: gradV_an_RR, gradV_an_LL, gradV_an_RL, gradV_an_LR = self.visibility_gradients(obs,p,verbosity=verbosity) gradV_an = gradV_an_RR + gradV_an_LL + gradV_an_RL + gradV_an_LR gradV_fd = [] q = np.copy(p) for i in range(self.size) : q[i] = p[i]+h[i] Vp_RR, Vp_LL, Vp_RL, Vp_LR = self.visibilities(obs,q,verbosity=verbosity) q[i] = p[i]-h[i] Vm_RR, Vm_LL, Vm_RL, Vm_LR = self.visibilities(obs,q,verbosity=verbosity) q[i] = p[i] gradV_fd.append(((Vp_RR-Vm_RR)/(2.0*h[i])) + ((Vp_LL-Vm_LL)/(2.0*h[i])) + ((Vp_RL-Vm_RL)/(2.0*h[i])) + ((Vp_LR-Vm_LR)/(2.0*h[i]))) gradV_fd = np.array(gradV_fd).T lbls = self.parameter_labels() err = (gradV_fd-gradV_an) print("Gradient Check Report:") print(" Sample of errors by parameter") print(" %30s %15s %15s %15s %15s %15s"%("Param.","u","v","anal. grad","fd. grad","err")) for i in range(self.size) : for j in range(min(len(obs.data['u']),10)) : print(" %30s %15.8g %15.8g %15.8g %15.8g %15.8g"%(lbls[i],obs.data['u'][j],obs.data['v'][j],gradV_an[j][i],gradV_fd[j][i],err[j][i])) mfe_arg = np.argmax(err) mfe_i = mfe_arg%self.size mfe_j = mfe_arg//self.size max_err = np.max(err) print(" Global Maximum error:",max_err) print(" %30s %15.8g %15.8g %15.8g %15.8g %15.8g"%(lbls[mfe_i],obs.data['u'][mfe_j],obs.data['v'][mfe_j],gradV_an[mfe_j][mfe_i],gradV_fd[mfe_j][mfe_i],err[mfe_j][mfe_i]))
[docs] def fisher_covar(self,obs,p,verbosity=0,**kwargs) : """ Returns the Fisher matrix as defined in the accompanying documentation. Intelligently avoids recomputation if the observation and parameters are unchanged. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of model parameters at which to compute the Fisher matrix. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray): The Fisher matrix. """ # Get the fisher covariance new_argument_hash = hashlib.md5(bytes(str(obs)+str(p)+str(self.prior_sigma_list),'utf-8')).hexdigest() if ( new_argument_hash == self.argument_hash ) : return self.covar else : self.argument_hash = new_argument_hash if self.stokes == 'I': obs = obs.switch_polrep('stokes') gradV = self.visibility_gradients(obs,p,verbosity=verbosity,**kwargs) self.covar = np.zeros((self.size,self.size)) for i in range(self.size) : for j in range(self.size) : self.covar[i][j] = np.sum( np.conj(gradV[:,i])*gradV[:,j]/obs.data['sigma']**2 + gradV[:,i]*np.conj(gradV[:,j])/obs.data['sigma']**2) else: obs = obs.switch_polrep('circ') grad_RR, grad_LL, grad_RL, grad_LR = self.visibility_gradients(obs,p,verbosity=verbosity,**kwargs) self.covar = np.zeros((self.size,self.size)) for i in range(self.size) : for j in range(self.size) : self.covar[i][j] = np.sum( np.conj(grad_RR[:,i])*grad_RR[:,j]/obs.data['rrsigma']**2 + grad_RR[:,i]*np.conj(grad_RR[:,j])/obs.data['rrsigma']**2) self.covar[i][j] += np.sum( np.conj(grad_LL[:,i])*grad_LL[:,j]/obs.data['llsigma']**2 + grad_LL[:,i]*np.conj(grad_LL[:,j])/obs.data['llsigma']**2) self.covar[i][j] += np.sum( np.conj(grad_RL[:,i])*grad_RL[:,j]/obs.data['rlsigma']**2 + grad_RL[:,i]*np.conj(grad_RL[:,j])/obs.data['rlsigma']**2) self.covar[i][j] += np.sum( np.conj(grad_LR[:,i])*grad_LR[:,j]/obs.data['lrsigma']**2 + grad_LR[:,i]*np.conj(grad_LR[:,j])/obs.data['lrsigma']**2) if (verbosity>0) : print("FisherCovar covar before priors:") _print_matrix(self.covar) if (len(self.prior_sigma_list)>0) : for i in range(self.size) : if (not self.prior_sigma_list[i] is None) : self.covar[i][i] += 2.0/(self.prior_sigma_list[i]**2) # Why factor of 2? if (verbosity>0) : print("FisherCovar covar after priors:") _print_matrix(self.covar) if (verbosity>1) : print("FisherCovar priors:",self.prior_sigma_list) print("Dimensions:",self.covar.shape) return self.covar
[docs] def inverse_fisher_covar(self,obs,p,verbosity=0,**kwargs) : """ Returns the inverse of the Fisher matrix as defined in the accompanying documentation. Intelligently avoids recomputation if the observation and parameters are unchanged. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of model parameters at which to compute the Fisher matrix. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray): The inverse of the Fisher matrix. """ new_inv_argument_hash = hashlib.md5(bytes(str(obs)+str(p)+str(self.prior_sigma_list),'utf-8')).hexdigest() if ( new_inv_argument_hash == self.inv_argument_hash ) : return self.invcovar else : self.inv_argument_hash = new_inv_argument_hash self.invcovar = _invert_matrix(self.fisher_covar(obs,p,verbosity=verbosity,**kwargs)) return self.invcovar
[docs] def uniparameter_uncertainties(self,obs,p,ilist=None,verbosity=0,**kwargs) : """ Computes the uncertainties on a subset of model parameters, fixing all others. Note that this is not a covariance; each uncertainty is for a single parameter, fixing all others. This is probably not what is wanted for forecasting, see marginalized_uncertainties. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. ilist (int,list) : Index or list of indicies for which to compute the uncertainties. If None will return a list of all uncertainties. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (float/list): Parameter uncertainty or list of marginalized uncertainties of desired parameters. """ C = self.fisher_covar(obs,p,verbosity=verbosity,**kwargs) if (ilist is None) : ilist = np.arange(self.size) if (isinstance(ilist,int)) : N = C[ilist][ilist] Sig_uni = np.sqrt(2.0/N) else : Sig_uni = np.zeros(self.size) for i in ilist : N = C[i][i] Sig_uni[i] = np.sqrt(2.0/N) return Sig_uni
[docs] def marginalized_uncertainties(self,obs,p,ilist=None,verbosity=0,**kwargs) : """ Computes the uncertainties on a subset of model parameters, marginalized over all others. Note that this is not the marginalized covariance; each uncertainty is for a single parameter, marginalizing out all others. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. ilist (int,list) : Index or list of indicies for which to compute the marginalized uncertainties. If None will return a list of all marginalized uncertainties. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (float/list): Marginalized parameter uncertainty or list of marginalized uncertainties of desired parameters. """ C = self.fisher_covar(obs,p,verbosity=verbosity,**kwargs) # M = np.zeros((self.size-1,self.size-1)) # v = np.zeros(self.size-1) if (ilist is None) : ilist = np.arange(self.size) if (isinstance(ilist,int)) : i = ilist Cinv = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) mN = _invert_matrix(Cinv[[ilist],:][:,[ilist]]) Sig_marg = np.sqrt(2.0/mN[0,0]) # ilist = np.arange(self.size) # ini = ilist[ilist!=i] # M = C[ini,:][:,ini] # v = C[i,ini] # N = C[i][i] # v,M = self._condition_vM(v,M) # Minv = _invert_matrix(M) # mN = (N - _vMv(v,Minv)) # Sig_marg = np.sqrt(2.0/mN) else : Cinv = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) Sig_marg = np.sqrt(2.0*Cinv[ilist,ilist]) # Sig_marg = np.zeros(len(ilist)) # iall = np.arange(self.size) # for k,i in enumerate(ilist) : # ini = iall[iall!=i] # M = C[ini,:][:,ini] # v = C[ini,i] # N = C[i][i] # v,M = self._condition_vM(v,M) # Minv = _invert_matrix(M) # mN = N - _vMv(v,Minv) # Sig_marg[k] = np.sqrt(2.0/mN) return Sig_marg
[docs] def marginalized_covariance(self,obs,p,ilist=None,verbosity=0,**kwargs) : """ Computes the covariance for a subset of model parameters, marginalized over all parameters outside of the subset. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. ilist (int,list) : Index or list of indicies for which to compute the marginalized uncertainties. If None will return a list of all marginalized uncertainties. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray): Marginalized covariance for desired parameter subset. """ C = self.fisher_covar(obs,p,verbosity=verbosity,**kwargs) if (ilist is None) : return C elif (isinstance(ilist,int)) : return self.marginalized_uncertainties(obs,p,ilist=ilist,verbosity=0,**kwargs) else : Cinv = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) return ( _invert_matrix(Cinv[ilist,:][:,ilist]) )
# iall = np.arange(self.size) # isni = (iall!=ilist[0]) # for i in ilist[1:] : # isni = isni*(iall!=i) # ini = iall[isni] # n = C[ilist,:][:,ilist] # r = C[ini,:][:,ilist] # m = C[ini,:][:,ini] # r,m = self._condition_vM(r,m) # return n - _vMv(r,_invert_matrix(m))
[docs] def uncertainties(self,obs,p,ilist=None,verbosity=0,**kwargs) : """ Computes the uncertainties on a subset of model parameters, both fixing and marginalizing over all others. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. ilist (int,list) : Index or list of indicies for which to compute the marginalized uncertainties. If None will return a list of all marginalized uncertainties. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (float/list,float/list): Parameter uncertainty or list of marginalized uncertainties of desired parameters, marginalized parameter uncertainty or list of marginalized uncertainties of desired parameters. """ C = self.fisher_covar(obs,p,verbosity=verbosity,**kwargs) Cinv = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) if (verbosity>1) : print("Fisher Matrix:") _print_matrix(C) Sig_uni = self.uniparameter_uncertainties(obs,p,ilist,verbosity=verbosity,**kwargs) Sig_marg = self.marginalized_uncertainties(obs,p,ilist,verbosity=verbosity,**kwarg) # if (ilist is None) : # ilist = np.arange(self.size) # if (isinstance(ilist,int)) : # i = ilist # ilist = np.arange(self.size) # ini = ilist[ilist!=i] # M = C[ini,:][:,ini] # v = C[i,ini] # N = C[i][i] # v,M = self._condition_vM(v,M) # Minv = _invert_matrix(M) # mN = N - _vMv(v,Minv) # Sig_uni[i] = np.sqrt(2.0/N) # Sig_marg[i] = np.sqrt(2.0/mN) # else : # Sig_uni = np.zeros(self.size) # Sig_marg = np.zeros(self.size) # M = np.zeros((self.size-1,self.size-1)) # v = np.zeros(self.size-1) # ilist = np.arange(self.size) # for i in ilist : # ini = ilist[ilist!=i] # M = C[ini,:][:,ini] # v = C[i,ini] # N = C[i][i] # v,M = self._condition_vM(v,M) # Minv = _invert_matrix(M) # mN = N - _vMv(v,Minv) # Sig_uni[i] = np.sqrt(2.0/N) # Sig_marg[i] = np.sqrt(2.0/mN) # if (verbosity>1) : # print("Submatrix (%i):"%(i)) # _print_matrix(M) # print("Submatrix inverse (%i):"%(i)) # _print_matrix(Minv) # print("Subvectors v1 (%i):"%(i)) # _print_vector(v) # print("N,mN (%i):"%(i),N,mN) return Sig_uni, Sig_marg
[docs] def sample_posterior(self, obs, p, N, ilist=None, verbosity=0, **kwargs) : """ Generates a chain of samples appropriate for post-processing as done in MCMC analyses. Such chains may be useful for computing the posterior on complicated non-linear quantities. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. N (int): Number of samples to generate. ilist (list): List of parameters to sample, marginalizing over all others. If None, all parameters are sampled. Default: None. Returns: (numpy.ndarray): Two-dimensional array containing chain of parameter samples. """ mC = self.marginalized_covariance(obs,p,ilist) l,e = np.linalg.eigh(mC) if (verbosity>0) : print("Eigenvalues ---") _print_vector(l) Sig = np.sqrt(2.0/l) chain = np.random.normal(size=(N,len(Sig))) * Sig chain = np.matmul(chain,e.T) return chain
[docs] def joint_biparameter_chisq(self,obs,p,i1,i2,kind='marginalized',verbosity=0,**kwargs) : """ Computes the ensemble-averaged 2nd-order contribution to the chi-square for two parameters after fixing or marginalizing over all others. Args: obs (ehtim.obsdata.Obsdata): An Obsdata object containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. i1 (int): Index of first parameter. i2 (int): Index of second parameter. kind (str): Choice of what to do with other parameters. Choices are 'marginalized', 'fixed'. Default: 'marginalized'. verbosity (int): Verbosity level. Default: 0. Returns: (numpy.ndarray,numpy.ndarray,numpy.ndarray): :math:`p_{i1}`, :math:`p_{i2}`, and :math:`\\chi^2` on a grid of points that covers the inner :math:`4.5\\Sigma` region. """ ilist = [i1,i2] if (kind.lower()=='fixed') : # pass C = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) Ci1i2 = C[ilist,:][:,ilist] elif (kind.lower()=='marginalized') : # Cinv = self.inverse_fisher_covar(obs,p,verbosity=verbosity,**kwargs) # Ci1i2 = _invert_matrix(Cinv[ilist,:][:,ilist]) # if (verbosity>1) : # print("Fisher Matrix:") # _print_matrix(C) # print("Submatrix (%i,%i):"%(i1,i2)) # _print_matrix(M) # print("Subvectors v1:") # _print_vector(v1) # print("Subvectors v2:") # _print_vector(v2) C = self.fisher_covar(obs,p,verbosity=verbosity,**kwargs) M = np.zeros((self.size-2,self.size-2)) v1 = np.zeros(self.size-2) v2 = np.zeros(self.size-2) ilist = np.arange(self.size) ini12 = ilist[(ilist!=i1)*(ilist!=i2)] for j2,j in enumerate(ini12) : for k2,k in enumerate(ini12) : M[j2,k2] = C[j][k] v1[j2] = C[i1][j] v2[j2] = C[i2][j] N1 = C[i1][i1] N2 = C[i2][i2] C12 = C[i1][i2] vv = np.vstack([v1,v2]).T vv,M = self._condition_vM(vv,M) v1 = vv[:,0] v2 = vv[:,1] Minv = _invert_matrix(M) N1 = N1 - _vMv(v1,Minv) N2 = N2 - _vMv(v2,Minv) C12 = C12 - 0.5*(np.matmul(v1.T,np.matmul(Minv,v2)) + np.matmul(v2.T,np.matmul(Minv,v1))) Ci1i2 = np.array([[N1, C12],[C12, N2]]) else : raise(RuntimeError("Received unexpected value for kind, %s. Allowed values are 'fixed' and 'marginalized'."%(kind))) # Set some names N1 = Ci1i2[0,0] N2 = Ci1i2[1,1] C12 = 0.5*(Ci1i2[0,1]+Ci1i2[1,0]) # Find range with eigenvectors/values l1 = 0.5*( (N1+N2) + np.sqrt( (N1-N2)**2 + 4.0*C12**2 ) ) l2 = 0.5*( (N1+N2) - np.sqrt( (N1-N2)**2 + 4.0*C12**2 ) ) e1 = np.array( [ C12, l1-N1 ] ) e2 = np.array( [ e1[1], -e1[0] ] ) e1 = e1/ np.sqrt( e1[0]**2+e1[1]**2 ) e2 = e2/ np.sqrt( e2[0]**2+e2[1]**2 ) if (l1<=0 or l2<=0) : print("Something's wrong! Variances are nonpositive!") print(i1,i2) print(l1,l2) print(N1,N2,C12) # print(mN1,mN2,mC12) # print(C) print("--- covar i1,i2 ---") _print_matrix(Ci1i2) print("Eigenvalues:",np.linalg.eigvalsh(Ci1i2)) l1 = max(1e-10,l1) l2 = max(1e-10,l2) if (l1>l2 ): Sig_maj = np.sqrt(1.0/l2) Sig_min = np.sqrt(1.0/l1) e_maj = np.copy(e2) e_min = np.copy(e1) else : Sig_maj = np.sqrt(1.0/l1) Sig_min = np.sqrt(1.0/l2) e_maj = np.copy(e1) e_min = np.copy(e2) dp1 = 4.5*( Sig_maj*np.abs(e_maj[0]) + Sig_min*np.abs(e_min[0]) ) dp2 = 4.5*( Sig_maj*np.abs(e_maj[1]) + Sig_min*np.abs(e_min[1]) ) Npx = int(max(128,min(16*Sig_maj/Sig_min,1024))) p1,p2 = np.meshgrid(np.linspace(-dp1,dp1,Npx),np.linspace(-dp2,dp2,Npx)) csq = 0.5*(N1*p1**2 + 2*C12*p1*p2 + N2*p2**2) return p1,p2,csq
[docs] def plot_1d_forecast(self,obs,p,i1,kind='marginalized',labels=None,fig=None,axes=None,colors=None,alphas=0.25,verbosity=0,**kwargs) : """ Plot the marginalized posterior for a single parameter for a given observation. Args: obs (list,Obsdata): An Obsdata object or list of Obsdata objects containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. i1 (int): Index of parameter to be plotted. kind (str): Choice of what to do with other parameters. Choices are 'marginalized', 'fixed'. Default: 'marginalized'. labels (list,str): A list of labels for each Obsdata object. When fewer labels than observations are provided, the first set of observations are labeled. Default: None. fig (matplotlib.figure.Figure): Figure on which to place plot. If None, a new figure will be created. Default: None. axes (matplotlib.axes.Axes): Axes on which to place plot. If None, a new axes will be created. Default: None. colors (list,str): A color or list of colors for the plots. When fewer colors than observations are provided, they will be cycled through. If None, a default list will be used. Default: None. alphas (list,float): An alpha value or list of alpha values for the filled portion of the plots. Default: 0.25. verbosity (int): Verbosity level. Default: 0. Returns: (matplotlib.figure.Figure, matplotlib.axes.Axes): Handles to the figure and axes objects in the plot. """ if (isinstance(obs,list)) : obslist = obs else : obslist = [obs] if (labels is None) : labels = len(obslist)*[None] elif (not isinstance(labels,list)) : labels = [labels] if (len(labels)<len(obslist)) : raise(RuntimeError("Label list must have the same size as the observation list.")) if (axes is None) : if (fig is None) : plt.figure(figsize=(5,4)) plt.axes([0.15,0.15,0.8,0.8]) else : plt.sca(axes) if (colors is None) : colors = self.default_color_list elif (isinstance(colors,list)) : pass else : colors = [colors] if (isinstance(alphas,list)) : pass else : alphas = [alphas] for k,obs in enumerate(obslist) : if (kind=='marginalized') : Sigma = self.marginalized_uncertainties(obs,p,ilist=i1) elif (kind=='fixed') : Sigma = self.uniparameter_uncertainties(obs,p,ilist=i1) else : raise(RuntimeError("Received unexpected value for kind, %s. Allowed values are 'fixed' and 'marginalized'."%(kind))) x = np.linspace(-5*Sigma,5*Sigma,256) y = np.exp(-x**2/(2.0*Sigma**2)) / np.sqrt(2.0*np.pi*Sigma**2) plt.fill_between(x,y,y2=0,alpha=self._choose_from_list(alphas,k),color=self._choose_from_list(colors,k)) plt.plot(x,y,'-',color=self._choose_from_list(colors,k),label=labels[k]) plt.xlabel(self.parameter_labels()[i1]) plt.yticks([]) plt.ylim(bottom=0) if (not (np.array(labels)==None).all()) : plt.legend() return plt.gcf(),plt.gca()
[docs] def plot_2d_forecast(self,obs,p,i1,i2,kind='marginalized',labels=None,fig=None,axes=None,colors=None,cmaps=None,alphas=0.75,verbosity=0,**kwargs) : """ Plot the joint marginalized posterior for a pair of parameters for a given observation. Args: obs (list,Obsdata): An Obsdata object or list of Obsdata objects containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. i1 (int): Index of first parameter to be plotted. i2 (int): Index of second parameter to be plotted. kind (str): Choice of what to do with other parameters. Choices are 'marginalized', 'fixed'. Default: 'marginalized'. labels (list,str): A list of labels for each Obsdata object. When fewer labels than observations are provided, the first set of observations are labeled. Default: None. fig (matplotlib.figure.Figure): Figure on which to place plot. If None, a new figure will be created. Default: None. axes (matplotlib.axes.Axes): Axes on which to place plot. If None, a new axes will be created. Default: None. colors (list,str): A color or list of colors for the plots. When fewer colors than observations are provided, they will be cycled through. If None, a default list will be used. Default: None. cmaps (list,matplotlib.colors.Colormap): A colormap or list of colormaps for the plots. When fewer colormaps than observations are provided, they will be cycled through. If None, a default list will be used. Default: None. alphas (list,float): An alpha value or list of alpha values for the filled portion of the plots. Default: 0.75. verbosity (int): Verbosity level. Default: 0. Returns: (matplotlib.figure.Figure, matplotlib.axes.Axes): Handles to the figure and axes objects in the plot. """ if (isinstance(obs,list)) : obslist = obs else : obslist = [obs] if (labels is None) : labels = len(obslist)*[None] elif (not isinstance(labels,list)) : labels = [labels] if (len(labels)<len(obslist)) : raise(RuntimeError("Label list must have the same size as the observation list.")) if (axes is None) : if (fig is None) : plt.figure(figsize=(5,5)) plt.axes([0.2,0.2,0.75,0.75]) else : plt.sca(axes) if (colors is None) : colors = self.default_color_list elif (isinstance(colors,list)) : pass else : colors = [colors] if (cmaps is None) : cmaps = self.default_cmap_list elif (isinstance(cmaps,list)) : pass else : cmaps = [cmaps] if (isinstance(alphas,list)) : pass else : alphas = [alphas] for k,obs in enumerate(obslist) : d,w,csq = self.joint_biparameter_chisq(obs,p,i1,i2,verbosity=verbosity,kind=kind,**kwargs) plt.contourf(d,w,np.sqrt(csq),cmap=self._choose_from_list(cmaps,k),alpha=self._choose_from_list(alphas,k),levels=[0,1,2,3]) plt.contour(d,w,np.sqrt(csq),colors=self._choose_from_list(colors,k),alpha=1,levels=[0,1,2,3]) plt.plot([],[],'-',color=self._choose_from_list(colors,k),label=labels[k]) plt.xlabel(self.parameter_labels()[i1]) plt.ylabel(self.parameter_labels()[i2]) plt.grid(True,alpha=0.25) if (not (np.array(labels)==None).all()) : plt.legend() return plt.gcf(),plt.gca()
[docs] def plot_triangle_forecast(self,obs,p,ilist=None,kind='marginalized',labels=None,colors=None,cmaps=None,alphas=0.75,fig=None,axes=None,figsize=None,axis_location=None,verbosity=0,**kwargs) : """ Generate a triangle plot of joint posteriors for a selected list of parameter indexes. Args: obs (list,Obsdata): An Obsdata object or list of Obsdata objects containing the observation particulars. p (list): List of parameter values at which to compute uncertainties. ilist (list): List of indicies of parameters to include in triangle plot. If None, includes all parameters. Default: None. kind (str): Choice of what to do with other parameters. Choices are 'marginalized', 'fixed'. Default: 'marginalized'. labels (list,str): A list of labels for each Obsdata object. When fewer labels than observations are provided, the first set of observations are labeled. Default: None. colors (list,str): A color or list of colors for the plots. When fewer colors than observations are provided, they will be cycled through. If None, a default list will be used. Default: None. cmaps (list,matplotlib.colors.Colormap): A colormap or list of colormaps for the plots. When fewer colormaps than observations are provided, they will be cycled through. If None, a default list will be used. Default: None. alphas (list,float): An alpha value or list of alpha values for the filled portion of the plots. Default: 0.75. fig (matplotlib.figure.Figure): Figure on which to place plot. If None, a new figure will be created. Default: None. axes (matplotlib.axes.Axes): Axes on which to place plot. If None, a new axes will be created. Default: None. figsize (list): Figure size in inches. If None, attempts a guess. Default: None. axis_location (list): Location parameters for triangle plot region. If None, attempts a guess. Default: None. verbosity (int): Verbosity level. Default: 0. Returns: (matplotlib.figure.Figure, dict): Handles to the figure and dictionary of axes objects in the plot. """ if (isinstance(obs,list)) : obslist = obs else : obslist = [obs] if (ilist is None) : ilist = np.arange(self.size) if (labels is None) : labels = len(obslist)*[None] elif (not isinstance(labels,list)) : labels = [labels] if (len(labels)<len(obslist)) : raise(RuntimeError("Label list must have the same size as the observation list.")) if (colors is None) : colors = self.default_color_list elif (isinstance(colors,list)) : pass else : colors = [colors] if (cmaps is None) : cmaps = self.default_cmap_list elif (isinstance(cmaps,list)) : pass else : cmaps = [cmaps] if (isinstance(alphas,list)) : pass else : alphas = [alphas] if (fig is None) : plt.figure(figsize=(len(ilist)*2.5,len(ilist)*2.5)) else : plt.scf(fig) if (figsize is None) : figsize = plt.gcf().get_size_inches() else : plg.gcf().set_size_inches(figsize) if (axis_location is None) : lmarg = 0.625 # Margin in inches rmarg = 0.625 # Margin in inches tmarg = 0.625 # Margin in inches bmarg = 0.625 # Margin in inches ax0 = lmarg/figsize[0] ay0 = bmarg/figsize[1] axw = (figsize[0]-lmarg-rmarg)/figsize[0] ayw = (figsize[1]-tmarg-bmarg)/figsize[1] axis_location = [ax0, ay0, axw, ayw] # Number of rows/columns nrow = len(ilist) # Make axes dictionary if it doesn't exist already if (axes is None) : # Get window size details gutter_size = 0.0625 # Gutter in inches x_gutter = gutter_size/figsize[0] y_gutter = gutter_size/figsize[1] x_window_size = (axis_location[2] - (nrow-1)*x_gutter)/float(nrow) y_window_size = (axis_location[3] - (nrow-1)*y_gutter)/float(nrow) axes = {} for j in range(nrow) : for i in range(j+1) : # Find axis location with various gutters, etc. x_window_start = axis_location[0] + i*(x_gutter+x_window_size) y_window_start = axis_location[1] + axis_location[3] - y_window_size - j*(y_gutter+y_window_size) axes[i,j] = plt.axes([x_window_start, y_window_start, x_window_size, y_window_size]) # Run over panels and plot xlim_dict = {} for k,obs in enumerate(obslist) : if (kind=='marginalized') : Sigma = self.marginalized_uncertainties(obs,p,verbosity=verbosity,**kwargs) elif (kind=='fixed') : Sigma = self.uniparameter_uncertainties(obs,p,verbosity=verbosity,**kwargs) else : raise(RuntimeError("Received unexpected value for kind, %s. Allowed values are 'fixed' and 'marginalized'."%(kind))) # print("Sigma before:",Sigma) for jj in range(len(Sigma)) : if (np.isnan(Sigma[jj]) or np.isinf(Sigma[jj])) : Sigma[jj] = 999.0 # print("Sigma after: ",Sigma) for j in range(nrow) : jj = ilist[j] xtmp = np.linspace(-3.5*Sigma[jj],3.5*Sigma[jj],256) ytmp = np.exp(-xtmp**2/(2.0*Sigma[jj]**2))/np.sqrt(2.0*np.pi*Sigma[jj]**2) axes[j,j].fill_between(xtmp,ytmp,y2=0,color=self._choose_from_list(colors,k),alpha=self._choose_from_list(alphas,k)/3.0) axes[j,j].plot(xtmp,ytmp,color=self._choose_from_list(colors,k)) if (k==0) : xlim_dict[j] = (-3.5*Sigma[jj],3.5*Sigma[jj]) else : if (3.5*Sigma[jj]>xlim_dict[j][1]) : xlim_dict[j] = (-3.5*Sigma[jj],3.5*Sigma[jj]) for j in range(nrow) : for i in range(j) : # ii = ilist[ni-i] ii = ilist[i] jj = ilist[j] plt.sca(axes[i,j]) p1,p2,csq = self.joint_biparameter_chisq(obs,p,ii,jj,kind=kind,verbosity=verbosity) plt.contourf(p1,p2,np.sqrt(csq),cmap=self._choose_from_list(cmaps,k),alpha=self._choose_from_list(alphas,k),levels=[0,1,2,3]) plt.contour(p1,p2,np.sqrt(csq),colors=self._choose_from_list(colors,k),alpha=1,levels=[0,1,2,3]) plt.xlim(xlim_dict[i]) plt.ylim(xlim_dict[j]) plt.grid(True,alpha=0.25) # plt.text(0.05,0.05,'%i,%i / %i,%i'%(i,j,ii,jj),transform=plt.gca().transAxes) for j in range(nrow) : axes[j,j].set_xlim(xlim_dict[j]) axes[j,j].set_ylim(bottom=0) axes[j,j].set_yticks([]) for j in range(nrow-1) : for i in range(j+1) : axes[i,j].set_xticklabels([]) for j in range(nrow) : for i in range(1,j+1) : axes[i,j].set_yticklabels([]) for j in range(1,nrow) : axes[0,j].set_ylabel(self.parameter_labels()[ilist[j]]) for i in range(nrow) : axes[i,nrow-1].set_xlabel(self.parameter_labels()[ilist[i]]) # Make axis for labels if (not (np.array(labels)==None).all()) : plt.axes([0.95,0.95,0.01,0.01]) for k in range(len(obslist)) : plt.plot([],[],color=self._choose_from_list(colors,k),label=labels[k]) plt.gca().spines.right.set_visible(False) plt.gca().spines.left.set_visible(False) plt.gca().spines.top.set_visible(False) plt.gca().spines.bottom.set_visible(False) plt.gca().set_xticks([]) plt.gca().set_yticks([]) plt.legend(loc='upper right',bbox_to_anchor=(0,0)) return plt.gcf(),axes
def _condition_vM(self,v,M) : """ A helper function that conditions the vector and matrix associated with marginalization. This conditioning does not change v.M^{-1}.v, but does ensure that all quantities in the matrix multiplication and matrix inversion are of similar size. It cannot address numerical instability caused by strongly correlated or nearly degenerate M. The components of the marginalization process assume a structure for the covariance that looks like: ( N v.T ) ( v M ) So, if N is an nxn array and M is an mxm array, v should be an mxn array. The conditioning procedure is equivalent to normalizing the parameters that are being marginalized by the square roots of their variances. Args: v (nd.array): Vector or matrix associated with the correlations between the parameters to be marginalized over and those to be retained. M (nd.array): Covariance of the marginalized parameters. Returns: (nd.array,nd.array): Conditioned v and M. """ # Conditions v and M to improve inversion and double-dot product performance il = np.arange(M.shape[0]) dd = 1.0/np.sqrt(M[il,il]) for i in il : M[i,:] = M[i,:]*dd M[:,i] = M[:,i]*dd if (len(v.shape)==1) : v = dd*v else : for i in np.arange(v.shape[1]) : v[:,i] = v[:,i]*dd return v,M def _choose_from_list(self,v,i) : """ A helper function to cyclicly select from a list. Args: v (list): A list. i (int): An integer index. Returns: (v element type): Element of v at index i%len(v). """ return v[i%len(v)]
# if __name__ == "__main__" : # # Example # # Make Fisher forecast object # ff1 = FF_splined_raster(5,60) # ff2 = FF_smoothed_delta_ring() # ff = FF_sum([ff1,ff2]) # # Read in some data # obs_ngeht = eh.obsdata.load_uvfits('../data/M87_230GHz_40uas.uvfits') # obs_ngeht.add_scans() # obs_ngeht = obs_ngeht.avg_coherent(0,scan_avg=True) # obs_ngeht = obs_ngeht.add_fractional_noise(0.01) # # # obs = obs_ngeht.flag_sites(['ALMA','APEX','JCMT','SMA','SMT','LMT','SPT','PV','PDB','KP']) # obslist = [obs,obs_ngeht] # labels = ['ngEHT w/o EHT','Full ngEHT'] # # Choose a default image # p = np.zeros(ff.size) # rad2uas = 3600e6*180/np.pi # for j in range(ff1.npx) : # for i in range(ff1.npx) : # p[i+ff1.npx*j] = -((ff1.xcp[i,j]-5.0/rad2uas)**2+ff1.ycp[i,j]**2)*rad2uas**2/(2.0*25.0**2) + np.log( 1.0/( 2.0*np.pi * (25.0/rad2uas)**2 ) ) # p[-5] = 0.2 # p[-4] = 40 # p[-3] = 2 # p[-2] = 0 # p[-1] = 0 # p = np.array(p) # # 1D Diameter # plot_1d_forecast(ff,p,ff.size-4,obslist,labels=labels) # plt.savefig('ring_forecast_1d.png',dpi=300) # # 2D diameter vs width # plot_2d_forecast(ff,p,ff.size-4,ff.size-3,obslist,labels=labels) # plt.savefig('ring_forecast_2d.png',dpi=300) # # Triangle # plist = np.arange(len(p)-5,len(p)) # plot_triangle_forecast(ff,p,plist,obslist,labels=labels) # plt.savefig('ring_forecast_tri.png',dpi=300) # # plt.show()