# Copyright (C) 2017, 2018 Luiz Felippe S. Rodrigues <luiz.rodrigues@ncl.ac.uk>
#
# This file is part of GalMag.
#
# GalMag is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GalMag is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GalMag. If not, see <http://www.gnu.org/licenses/>.
#
"""
Contains a class for computing a selection of observables
"""
from B_generators.B_generator import B_generator
import electron_profiles as prof
import numpy as np
import util
[docs]class Observables(B_generator):
"""
Synchrotron emmission and Faraday rotation observables
Class properties compute and return a range of observables computed along
the axis specified in the initialization.
Parameters
----------
B_field : B_field
The magnetic field based on which the observables should be computed.
At the moment, only B_field constructed using a uniform cartesian grid
is supported
direction : str
The coordinate axis parallel to the line-of-sight (i.e. the axis along
which the integrations are performed).
Valid values: 'x'/'edge-on', 'y' or 'z'/'face-on'.
obs_electron_density_function : func
A function which receives three coordinate arrays r_spherical, theta, phi
and returns the electron density
obs_cosmic_ray_function : func
A function which receives three coordinate arrays r_spherical, theta, phi
and returns the cosmic ray density
obs_wavelength_m : float
The wavelength of used in the synchrotron emission calculations.
Default: 5 cm
obs_emissivivty_normalization : float
Needs to be adjusted
"""
def __init__(self, B_field, default_parameters={},
dtype=np.float64, direction=None, **kwargs):
if B_field.grid.grid_type != 'cartesian':
raise NotImplementedError, 'At the moment, only cartesian grids are supported.'
self.B_field = B_field
resolution = B_field.resolution.copy()
self.direction = direction
if direction == 'x' or direction == 'edge-on':
self._integration_axis = 0
self._Bp = self.B_field.x
self._depths = self.B_field.grid.x[:,0,0]
elif direction == 'y':
self._integration_axis = 1
self._Bp = self.B_field.y
self._depths = self.B_field.grid.y[0,:,0]
elif direction == 'z' or direction == 'face-on':
self._integration_axis = 2
self._Bp = self.B_field.z
self._depths = self.B_field.grid.z[0,0,:]
else:
raise NotImplementedError, 'Only "x", "y" and "z" directions are currently supported."'
self._ddepth = np.abs(self._depths[0]-self._depths[1])
resolution[self._integration_axis] = 1
super(Observables, self).__init__(box=B_field.box,
resolution=resolution,
default_parameters=default_parameters,
dtype=dtype)
# Reads in the parameters
self.parameters = self._parse_parameters(kwargs)
# Cached quantities dictionary
self._cache = {}
@property
def _builtin_parameter_defaults(self):
builtin_defaults = {
'obs_electron_density_function': prof.simple_ne, # n_e [cm^-3]
'obs_cosmic_ray_function': prof.constant_ncr, # n_{cr} [cm^-3]
'obs_wavelength_m': 5e-2, # 1 cm
'obs_gamma': 1.0, # cm
'obs_emissivivty_normalization': 1, # This needs to be updated
}
return builtin_defaults
[docs] def get_B_field(self):
"""B_field based on which this Observables object was constructed"""
return self.B_field
@property
def synchrotron_emissivity(self):
r"""
Synchrotron emissivity along a coordinate axis
.. math::
\epsilon = C\, n_{cr} B_\perp^{(\gamma+1)/2)} \lambda^{(\gamma-1)/2}
Returns
-------
3D-d2o
Syncrotron emmissivity, along a coordinate axis
"""
if 'synchrotron_emissivity' not in self._cache:
lamb = self.parameters['obs_wavelength_m']
gamma = self.parameters['obs_gamma']
norm = self.parameters['obs_emissivivty_normalization']
self._cache['synchrotron_emissivity'] = \
self._compute_synchrotron_emissivity(lamb, gamma, norm)
return self._cache['synchrotron_emissivity']
def _compute_synchrotron_emissivity(self, lamb, gamma, norm):
r"""
Helper for synchrotron_emissivity
Parameters
----------
lamb : float
wavelength
gamma : float
spectral index of the cosmic ray energy distribution
norm : float
normalization of the emissivity
Returns
-------
synchrotron emissivity (along a given axis)
"""
if self.direction == 'x':
Bperp2 = self.B_field.y**2 + self.B_field.z**2
elif self.direction == 'y':
Bperp2 = self.B_field.x**2 + self.B_field.z**2
elif self.direction == 'z':
# Bperp^2 = Bx^2 + By^2
Bperp2 = self.B_field.x**2 + self.B_field.y**2
ncr = self.parameters['obs_cosmic_ray_function'](
self.B_field.grid.r_spherical,
self.B_field.grid.theta,
self.B_field.grid.phi)
return ncr * Bperp2**((gamma+1)/4) * lamb**((gamma-1)/2)
@property
def intrinsic_polarization_degree(self):
r"""
Intrinsic degree of polarization
.. math::
p_0 = \frac{\gamma+1}{\gamma + 7/3}
"""
if 'intrinsic_polarization_degree' not in self._cache:
gamma = self.parameters['obs_gamma']
self._cache['intrinsic_polarization_degree']=(gamma+1.0)/(gamma+7./3.)
return self._cache['intrinsic_polarization_degree']
@property
def intrinsic_polarization_angle(self):
r"""
Intrinsic polarization angle
.. math::
\psi_0 = \frac{\pi}{2} + \tan^{-1}\left(\frac{B_y}{B_x}\right)
"""
if 'intrinsic_polarization_angle' not in self._cache:
if self.direction == 'x':
B1 = self.B_field.z
B2 = self.B_field.y
elif self.direction == 'y':
B1 = self.B_field.x
B2 = self.B_field.z
elif self.direction == 'z':
B1 = self.B_field.y
B2 = self.B_field.x
else:
raise ValueError
psi0 = np.pi/2.0 + util.arctan2(B1,B2)
psi0[psi0>np.pi] = psi0[psi0>np.pi]-2.*np.pi
self._cache['intrinsic_polarization_angle'] = psi0
return self._cache['intrinsic_polarization_angle']
@property
def electron_density(self):
r"""
Thermal electron density evaluated on this grid used for calculations.
This is set through the parameter obs_electron_density_function,
chosen during initialization.
"""
if 'electron_density' not in self._cache:
# Gets local grid (aka beginning of d2o gymnastics)
local_r_sph_grid = self.B_field.grid.r_spherical.get_local_data()
local_theta_grid = self.B_field.grid.theta.get_local_data()
local_phi_grid = self.B_field.grid.phi.get_local_data()
# Evaluate obs_electron_density_function on the local grid
local_ne = self.parameters['obs_electron_density_function'](
local_r_sph_grid,
local_theta_grid,
local_phi_grid)
# Initializes global array and set local data into a d2o
global_ne = self.B_field.grid.get_prototype(dtype=self.dtype)
global_ne.set_local_data(local_ne, copy=False)
self._cache['electron_density'] = global_ne
return self._cache['electron_density']
@property
def psi(self):
r"""
Polarization angle of radiation emitted at a given depth Faraday
rotated over the line of sight.
.. math::
\psi(z) = \psi_0(z)
+ 0.81\,{\rm rad}\left(\frac{\lambda}{1\,\rm m}\right)^2 \int_z^\infty
\left(\frac{n_e(z')}{1\,\rm cm^{-3}}\right)
\left(\frac{B_\parallel(z')}{1\,\mu\rm G}\right)
\frac{{\rm d} z'}{1\,\rm pc}
"""
if 'psi' not in self._cache:
lamb = self.parameters['obs_wavelength_m']
ne = self.electron_density
self._cache['psi'] = self._compute_psi(lamb, ne)
return self._cache['psi']
def _compute_psi(self, lamb, ne, from_bottom=False):
"""Computes the Faraday rotated polarization angle of radiation emmited
at each depth
Parameters
----------
lamb : float
Wavelength used for the computation (in meters)
ne : 3D d2o
Array containing the electron density in the galaxy
from_bottom : bool
Whether the observation is done "from bottom" (integration starting
from negative values) or "from top" (positive). Default: False
"""
Bp = self._Bp
ddepth = self._ddepth * 1000
axis = [slice(None),slice(None),slice(None)]
ax_n = self._integration_axis
slices = [slice(None),slice(None),slice(None)]
psi = self.intrinsic_polarization_angle.copy()
for i, depth in enumerate(self._depths):
axis[ax_n] = slice(i,i+1) # e.g. for z, this will select psi[:,:,i]
# The observer can be at the top or bottom
if not from_bottom:
# Will integrate from 0 to i; e.g. for z, Bp[:,:,0:i]
slices[ax_n] = slice(0,i)
else:
# Will integrate from i to the end
slices[ax_n] = slice(i,-1)
integrand = ne[slices] * Bp[slices] * ddepth
integral = integrand.sum(axis=ax_n)
# Adjust the axes and uses full data (to make sure to avoid mistakes)
integral = np.expand_dims(integral.get_full_data(), ax_n)
# psi(z) = psi0(z) + 0.81\lambda^2 \int_-\infty^z ne(z') Bpara(z') dz'
psi[axis] += 0.81*lamb**2 * integral
return psi
@property
def Stokes_I(self):
r"""
Stokes parameter I (total emmission)
.. math::
I(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) dw
where :math:`w` is the specified coordinate axis.
"""
if 'Stokes_I' not in self._cache:
self._cache['Stokes_I'] = self._compute_Stokes('I')
return self._cache['Stokes_I']
@property
def Stokes_Q(self):
r"""
Stokes parameter Q
.. math::
Q(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) p_0(w) \cos[2 \psi(w)] dw
where :math:`w` is the specified coordinate axis.
"""
if 'Stokes_Q' not in self._cache:
self._cache['Stokes_Q'] = self._compute_Stokes('Q')
return self._cache['Stokes_Q']
@property
def Stokes_U(self):
r"""
Stokes parameter U
.. math::
U(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) p_0(w) \sin[2 \psi(w)] dw
where :math:`w` is the specified coordinate axis.
"""
if 'Stokes_U' not in self._cache:
self._cache['Stokes_U'] = self._compute_Stokes('U')
return self._cache['Stokes_U']
def _compute_Stokes(self, parameter):
r"""
Computes Stokes parameters I, Q, U
.. math::
I(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) dw
.. math::
Q(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) p_0(w) \cos[2 \psi(w)] dw
.. math::
U(\lambda) = \int_{-\infty}^\infty \epsilon(w,\lambda) p_0(w) \sin[2 \psi(w)] dw
where :math:`w` is the specified coordinate axis.
Parameters
----------
parameter : str
Either 'I', 'Q' or 'U'
Returns
-------
The specified Stokes parameter
"""
emissivity = self.synchrotron_emissivity
# Computes the integrand
if parameter == 'I':
integrand = emissivity * self._ddepth
elif parameter == 'Q':
p0 = self.intrinsic_polarization_degree
cos2psi = util.distribute_function(np.cos, 2.0*self.psi)
integrand = emissivity * p0 * cos2psi * self._ddepth
elif parameter == 'U':
p0 = self.intrinsic_polarization_degree
sin2psi = util.distribute_function(np.sin, 2.0*self.psi)
integrand = emissivity * p0 * sin2psi * self._ddepth
else:
raise ValueError
# Sums/integrates along the specified axis and returns
return integrand.sum(axis=self._integration_axis)
@property
def observed_polarization_angle(self):
r"""
Observed integrated polarization angle
.. math::
\Psi = \frac{1}{2} \arctan\left(\frac{U}{Q}\right)
"""
if 'observed_polarization_angle' not in self._cache:
angle = 0.5 * util.arctan2(self.Stokes_U,self.Stokes_Q)
self._cache['observed_polarization_angle'] = angle
return self._cache['observed_polarization_angle']
@property
def polarized_intensity(self):
r"""
Polarized intensity
.. math::
P = \sqrt{Q^2 + U^2}
"""
if 'polarized_intensity' not in self._cache:
P = (self.Stokes_U**2 + self.Stokes_Q**2 )**0.5
self._cache['polarized_intensity'] = P
return self._cache['polarized_intensity']