# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""activity.gas core
"""
__all__ = [
'photo_lengthscale',
'photo_timescale',
'fluorescence_band_strength',
'Haser',
'VectorialModel'
]
from abc import ABC, abstractmethod
from distutils.log import warn
import warnings
import numpy as np
import astropy.units as u
try:
import scipy
from scipy import special
from scipy.integrate import quad, dblquad, romberg
from scipy.interpolate import CubicSpline
except ImportError:
scipy = None
from ... import bib
from ... import data as sbd
from ... import units as sbu
from ...exceptions import RequiredPackageUnavailable, TestingNeeded
from .. core import (Aperture, RectangularAperture, GaussianAperture,
AnnularAperture, CircularAperture)
[docs]def photo_lengthscale(species, source=None):
"""Photodissociation lengthscale for a gas species.
Parameters
----------
species : string
The species to look up.
source : string, optional
Retrieve values from this source (case insensitive). See
references for keys.
Returns
-------
gamma : `~astropy.units.Quantity`
The lengthscale at 1 au.
Examples
--------
>>> from sbpy.activity import photo_lengthscale
>>> gamma = photo_lengthscale('OH')
References
----------
[CS93] H2O and OH from Table IV of Cochran & Schleicher 1993,
Icarus 105, 235-253. Quoted for intermediate solar activity.
"""
from .data import photo_lengthscale as data
default_sources = {
'H2O': 'CS93',
'OH': 'CS93',
}
if species not in data:
summary = ''
for k, v in sorted(data.items()):
summary += '\n{} [{}]'.format(k, ', '.join(v.keys()))
raise ValueError(
'Invalid species {}. Choose from:{}'
.format(species, summary))
gas = data[species]
source = default_sources[species] if source is None else source
if source not in gas:
raise ValueError(
'Source key {} not available for {}. Choose from: {}'
.format(source, species, ', '.join(gas.keys())))
gamma, bibcode = gas[source]
bib.register(photo_lengthscale, bibcode)
return gamma
[docs]def photo_timescale(species, source=None):
"""Photodissociation timescale for a gas species.
Parameters
----------
species : string
Species to look up.
source : string, optional
Retrieve values from this source. See references for keys.
Returns
-------
tau : `~astropy.units.Quantity`
The timescale at 1 au. May be a two-element array: (quiet Sun,
active Sun).
Examples
--------
>>> from sbpy.activity import photo_timescale
>>> tau = photo_timescale('OH')
References
----------
[CS93] Table IV of Cochran & Schleicher 1993, Icarus 105, 235-253.
Quoted for intermediate solar activity.
[C94] Crovisier 1994, JGR 99, 3777-3781.
[CE83] Crovisier & Encrenaz 1983, A&A 126, 170-182.
[H92] Huebner et al. 1992, Astroph. & Space Sci. 195, 1-294.
"""
from .data import photo_timescale as data
default_sources = {
'H2O': 'CS93',
'OH': 'CS93',
'HCN': 'C94',
'CH3OH': 'C94',
'H2CO': 'C94',
'CO2': 'CE83',
'CO': 'CE83',
'CN': 'H92'
}
if species not in data:
summary = ''
for k, v in sorted(data.items()):
summary += '\n{} [{}]'.format(k, ', '.join(v.keys()))
raise ValueError(
"Invalid species {}. Choose from:{}"
.format(species, summary))
gas = data[species]
source = default_sources[species] if source is None else source
if source not in gas:
raise ValueError(
'Source key {} not available for {}. Choose from: {}'
.format(source, species, ', '.join(gas.keys())))
tau, bibcode = gas[source]
bib.register(photo_timescale, bibcode)
return tau
[docs]@sbd.dataclass_input(eph=sbd.Ephem)
def fluorescence_band_strength(species, eph=None, source=None):
"""Fluorescence band strength.
Parameters
----------
species : string
Species to look up.
eph : `~astropy.units.Quantity`, `~sbpy.data.Ephem` or `dict` optional
The target ephemeris. The strength is scaled to the given
heliocentric distance, if present. Some species require
heliocentric radial velocity ('rdot').
source : string, optional
Retrieve values from this source (case insensitive). See
references for keys.
Returns
-------
LN : `~astropy.units.Quantity`
Luminosity per molecule, scaled to rh, if provided.
Examples
--------
>>> import astropy.units as u
>>> from sbpy.activity import fluorescence_band_strength
>>>
>>> eph = {'rh': 1 * u.au, 'rdot': -1 * u.km / u.s}
>>> LN = fluorescence_band_strength('OH 0-0', eph, 'SA88')
>>> print(LN) # doctest: +FLOAT_CMP
[1.54e-15] erg / s
"""
from .data import fluorescence_band_strength as data
default_sources = {
'OH 0-0': 'SA88',
'OH 1-0': 'SA88',
'OH 1-1': 'SA88',
'OH 2-2': 'SA88',
'OH 0-1': 'SA88',
'OH 0-2': 'SA88',
'OH 2-0': 'SA88',
'OH 2-1': 'SA88',
}
if species not in data:
raise ValueError(
'No data available for {}. Choose one of: {}'
.format(species, ', '.join(data.keys())))
band = data[species]
source = default_sources[species] if source is None else source
if source not in band:
raise ValueError(
'No source {} for {}. Choose one of: {}'
.format(source, species, ', '.join(band.keys())))
LN, note, bibcode = band[source]
if bibcode is not None:
bib.register(fluorescence_band_strength, bibcode)
return LN(eph)
class GasComa(ABC):
"""Abstract base class for gas coma models.
Parameters
----------
Q : `~astropy.units.Quantity`
Production rate, number per time.
v : `~astropy.units.Quantity`
Radial outflow speed, distance per time.
"""
@u.quantity_input(Q=(u.s ** -1, u.mol / u.s), v=u.m / u.s)
def __init__(self, Q, v):
self.Q = Q
self.v = v
@u.quantity_input(r=u.m)
def volume_density(self, r):
"""Coma volume density.
Parameters
----------
r : `~astropy.units.Quantity`
Linear distance to the nucleus.
Returns
-------
n : `~astropy.units.Quantity`
Local number density.
"""
return self._volume_density(r.to_value('m')) / u.m ** 3
@sbd.dataclass_input(eph=sbd.Ephem)
@sbd.quantity_to_dataclass(eph=(sbd.Ephem, 'delta'))
def column_density(self, rho, eph=None):
"""Coma column density at a projected distance from nucleus.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance to the region of interest on the plane
of the sky in units of length or angle.
eph : dictionary-like, `~sbpy.data.Ephem`, `~astropy.units.Quantity`, optional
Target-observer distance, or ephemeris with ``'delta'``
field. Required to convert rho to a projected size.
Returns
-------
sigma : `~astropy.units.Quantity`
Coma column density along the line of sight at a distance
rho.
"""
equiv = []
if eph is not None:
equiv = sbu.projected_size(eph)
rho = rho.to_value('m', equiv)
return self._column_density(rho) / u.m ** 2
@sbd.dataclass_input(eph=sbd.Ephem)
@sbd.quantity_to_dataclass(eph=(sbd.Ephem, 'delta'))
def total_number(self, aper, eph=None):
"""Total number of molecules in aperture.
Parameters
----------
aper : `~astropy.units.Quantity`, `~sbpy.activity.Aperture`
Observation aperture. May be a circular aperture radius
with units of length or angle.
eph : dictionary-like, `~sbpy.data.Ephem`, `~astropy.units.Quantity`
Target-observer distance, or ephemeris with `delta`.
Required if the aperture has angular units.
Returns
-------
N : float
Total number of molecules within the aperture.
"""
if not isinstance(aper, Aperture):
aper = CircularAperture(aper)
if eph is not None:
aper = aper.as_length(eph)
return self._total_number(aper)
def _total_number(self, aper):
"""Total number of molecules in aperture.
Sub-classes of ``GasComa`` may override this method, instead of
``total_number``, which avoids reusing the boiler plate aperture
conversions.
Parameters
----------
aper : `~sbpy.activity.Aperture`
Observation aperture in units of length.
"""
return self._integrate_column_density(aper)[0]
@abstractmethod
def _volume_density(self, r):
"""Unitless volume density function.
Parameters
----------
r : float
Linear distance to the nucleus in meters.
Returns
-------
n : float
Local number density in inverse cubic-meters.
"""
@abstractmethod
def _column_density(self, rho):
"""Unitless column density function.
Parameters
----------
rho : float
Projected distance of the region of interest on the plane
of the sky in units of meters.
Returns
-------
sigma : float
Coma column density along the line of sight at a distance
rho in units of inverse square-meters.
"""
def _integrate_volume_density(self, rho, epsabs=1.49e-8):
"""Integrate volume density along the line of sight.
Parameters
----------
rho : float
Projected distance of the region of interest on the plane of
the sky in units of meters
epsabs : float, int, optional
Absolute and relative error tolerance for integrals. See
`scipy.integrate.quad`.
Returns
-------
sigma : float
Coma column density along ``rho`` in units of inverse
square-meters.
err : float
Estimated integration error.
"""
if not scipy:
raise RequiredPackageUnavailable('scipy')
def f(s, rho2):
r = np.sqrt(rho2 + s ** 2)
return self._volume_density(r)
# quad diverges integrating to infinity, but 1e6 × rho is good
# enough
limit = 30
points = rho * np.logspace(-4, 4, limit // 2)
sigma, err = quad(f, 0, 1e6 * rho, args=(rho ** 2,),
limit=limit, points=points, epsabs=epsabs)
# spherical symmetry
sigma *= 2
err *= 2
return sigma, err
def _integrate_column_density(self, aper, epsabs=1.49e-8):
"""Integrate column density over an aperture.
Parameters
----------
aper : `~sbpy.activity.Aperture`
Aperture, in units of length.
epsabs : float, int, optional
Absolute and relative error tolerance for integrals. See
`scipy.integrate.quad` (circular, annular, Gaussian) and
`~scipy.integrate.dblquad` (rectangular) for details.
Returns
-------
N : float
Total number.
err : float
Estimated integration error.
"""
if not scipy:
raise RequiredPackageUnavailable('scipy')
if isinstance(aper, (CircularAperture, AnnularAperture)):
if isinstance(aper, CircularAperture):
limits = (0, aper.radius.to_value('m'))
else:
limits = aper.shape.to_value('m')
# integrate in polar coordinates
def f(rho):
"""Column density integration in polar coordinates.
rho in m, column_density in m**-2
"""
return rho * self._column_density(rho)
N, err = quad(f, *limits, epsabs=epsabs)
N *= 2 * np.pi
err *= 2 * np.pi
elif isinstance(aper, RectangularAperture):
shape = aper.shape.to_value('m')
def f(rho, th):
"""Column density integration in polar coordinates.
rho in m, column_density in m**-2
th is ignored (azimuthal symmetry)
"""
return rho * self._column_density(rho)
# first "octant"; rho1 and rho2 are the limits of the
# integration
def rho1(th):
"Lower limit"
return 0
def rho2(th):
"Upper limit (a line)"
return shape[0] / 2 / np.cos(th)
th = np.arctan(shape[1] / shape[0])
N1, err1 = dblquad(f, 0, th, rho1, rho2, epsabs=epsabs)
# second "octant"
def rho2(th):
"Upper limit (a line)"
return shape[1] / 2 / np.cos(th)
th = np.arctan(shape[0] / shape[1])
N2, err2 = dblquad(f, 0, th, rho1, rho2, epsabs=epsabs)
# N1 + N2 constitute 1/4th of the rectangle
N = 4 * (N1 + N2)
err = 4 * (err1 + err2)
elif isinstance(aper, GaussianAperture):
# integrate in polar coordinates
def f(rho, sigma):
"""Column density integration in polar coordinates.
rho and sigma in m, column_density in m**-2
"""
return (rho * np.exp(-rho ** 2 / sigma ** 2 / 2)
* self._column_density(rho))
sigma = aper.sigma.to_value('m')
N, err = quad(f, 0, np.inf, args=(sigma,), epsabs=epsabs)
N *= 2 * np.pi
err *= 2 * np.pi
return N, err
[docs]class Haser(GasComa):
"""Haser coma model.
Some functions require `scipy`.
Parameters
----------
Q : `~astropy.units.Quantity`
Production rate, per time.
v : `~astropy.units.Quantity`
Radial outflow speed, distance per time.
parent : `~astropy.units.Quantity`
Coma lengthscale of the parent species.
daughter : `~astropy.units.Quantity`, optional
Coma lengthscale of the daughter species.
References
----------
Haser 1957, Bulletin de la Societe Royale des Sciences de Liege
43, 740.
Newburn and Johnson 1978, Icarus 35, 360-368.
"""
@bib.cite({'model': '1957BSRSL..43..740H'})
@u.quantity_input(parent=u.m, daughter=u.m)
def __init__(self, Q, v, parent, daughter=None):
super().__init__(Q, v)
self.parent = parent
self.daughter = daughter
def _volume_density(self, r):
n = (self.Q / self.v).to_value('1/m') / r ** 2 / 4 / np.pi
parent = self.parent.to_value('m')
if self.daughter is None or self.daughter == 0:
# parent only
n *= np.exp(-r / parent)
else:
daughter = self.daughter.to_value('m')
n *= (daughter / (parent - daughter)
* (np.exp(-r / parent) - np.exp(-r / daughter)))
return n
def _iK0(self, x):
"""Integral of the modified Bessel function of 2nd kind, 0th order."""
if not scipy:
raise RequiredPackageUnavailable('scipy')
return special.iti0k0(x)[1]
def _K1(self, x):
"""Modified Bessel function of 2nd kind, 1st order."""
if not scipy:
raise RequiredPackageUnavailable('scipy')
return special.k1(x)
@bib.cite({'model': '1978Icar...35..360N'})
def _column_density(self, rho):
sigma = (self.Q / self.v).to_value('1/m') / rho / 2 / np.pi
parent = self.parent.to_value('m')
if self.daughter is None or self.daughter == 0:
sigma *= np.pi / 2 - self._iK0(rho / parent)
else:
daughter = self.daughter.to_value('m')
sigma *= (daughter / (parent - daughter)
* (self._iK0(rho / daughter) - self._iK0(rho / parent)))
return sigma
def _total_number(self, aper):
# Inspect aper and handle as appropriate
if isinstance(aper, (RectangularAperture, GaussianAperture)):
return self._integrate_column_density(aper)[0]
elif isinstance(aper, AnnularAperture):
N0 = self._total_number(CircularAperture(aper.shape[0]))
N1 = self._total_number(CircularAperture(aper.shape[1]))
return N1 - N0
# Solution for the circular aperture of radius rho:
bib.register(self.total_number, {'model': '1978Icar...35..360N'})
rho = aper.radius
parent = self.parent.to(rho.unit)
x = (rho / parent).to_value(u.dimensionless_unscaled)
N = (self.Q * rho / self.v).to_value(u.dimensionless_unscaled)
if self.daughter is None or self.daughter == 0:
N *= 1 / x - self._K1(x) + np.pi / 2 - self._iK0(x)
else:
daughter = self.daughter.to(rho.unit)
y = (rho / daughter).to_value('')
N *= ((daughter / (parent - daughter)).to_value('')
* (self._iK0(y) - self._iK0(x) + x ** -1 - y ** -1
+ self._K1(y) - self._K1(x)))
return N
[docs]class VectorialModel(GasComa):
""" Vectorial model for fragments in a coma produced
with a dissociative energy kick
Parameters
----------
base_q : `~astropy.units.Quantity`
Base production rate, per time
parent: `~sbpy.data.Phys`
Object with the following physical property fields:
* ``tau_T``: total lifetime of the parent molecule
* ``tau_d``: photodissociative lifetime of the parent molecule
* ``v_outflow``: outflow velocity of the parent molecule
* ``sigma``: cross-sectional area of the parent molecule
fragment: `~sbpy.data.Phys`
Object with the following physical property fields:
* ``tau_T``: total lifetime of the fragment molecule
* ``v_photo``: velocity of fragment resulting from
photodissociation of the parent
q_t: callable, optional
Calculates the parent production rate as a function of time:
``q_t(t)``. The argument ``t`` is the look-back time as a float
in units of seconds. The return value is the production rate in
units of inverse seconds. If provided, this value is added to
``base_q``.
If no time-dependence function is given, the model will run with
steady production at ``base_q`` stretching infinitely far into the
past.
radial_points: int, optional
Number of radial grid points the model will use
radial_substeps: int, optional
Number of points along the contributing axis to integrate over
angular_points: int, optional
Number of angular grid points the model will use
angular_substeps: int, optional
Number of angular steps per radial substep to integrate over
parent_destruction_level: float, optional
Model will attempt to track parents until
this percentage has dissociated
fragment_destruction_level: float, optional
Model will attempt to track fragments until
this percentage has dissociated
max_fragment_lifetimes: float, optional
Fragments traveling through the coma will be ignored if they take
longer than this to arrive and contribute to the density at any
considered point
print_progress: bool, optional
Print progress percentage while calculating
References:
The density distribution of neutral compounds in cometary
atmospheres. I - Models and equations,
Festou, M. C. 1981, Astronomy and Astrophysics, vol. 95, no. 1,
Feb. 1981, p. 69-79.
"""
@bib.cite({'model': '1981A&A....95...69F'})
@u.quantity_input(base_q=(u.s ** -1, u.mol / u.s))
def __init__(self, base_q, parent, fragment, q_t=None, radial_points=50,
radial_substeps=12, angular_points=30, angular_substeps=7,
parent_destruction_level=0.99,
fragment_destruction_level=0.95,
max_fragment_lifetimes=8.0,
print_progress=False):
warnings.warn("Literature tests with the Vectorial model are generally"
" in agreement at the 20% level or better. The cause"
" for the differences with the Festou FORTRAN code are"
" not yet precisely known. Help testing this feature is"
" appreciated.", TestingNeeded)
super().__init__(base_q, parent['v_outflow'][0])
# Calculations are done internally in meters and seconds to match the
# base GasComa class
# Convert to unitless value of production per second
self.base_q = base_q.to(1 / u.s).value
# Copy time dependence or create a steady production function
if q_t is None:
self.q_t = self._make_steady_production()
else:
self.q_t = q_t
# Copy parent info, stripping astropy units and converting to meters
# and seconds
self.parent = {
'tau_T': parent['tau_T'][0].to(u.s).value,
'tau_d': parent['tau_d'][0].to(u.s).value,
'v_outflow': parent['v_outflow'][0].to(u.m / u.s).value,
'sigma': parent['sigma'][0].to(u.m ** 2).value
}
# Same for the fragment info
self.fragment = {
'tau_T': fragment['tau_T'][0].to(u.s).value,
'v_photo': fragment['v_photo'][0].to(u.m / u.s).value
}
# Grid settings
self.radial_points = radial_points
self.radial_substeps = radial_substeps
self.angular_points = angular_points
self.angular_substeps = angular_substeps
# Helps define cutoff for radial grid at this percentage of parents
# lost to decay
self.parent_destruction_level = parent_destruction_level
# Default here is lower than parents because they are born farther from
# nucleus, tracking them too long will stretch the radial grid a bit
# too much
self.fragment_destruction_level = fragment_destruction_level
# If a fragment has to travel longer than this many lifetimes to
# contribute to the density at a point, ignore it
self.max_fragment_lifetimes = max_fragment_lifetimes
# Print progress during density calculations?
self.print_progress = print_progress
"""Initialize data structures to hold our calculations"""
self.vmodel = {}
# Calculate up a few things
self._setup_calculations()
# Build the radial grid
self.vmodel['fast_radial_grid'] = self._make_radial_logspace_grid()
self.vmodel['radial_grid'] = self.vmodel['fast_radial_grid'] * (u.m)
# Angular grid
self.vmodel['d_alpha'] = self.vmodel[
'epsilon_max'] / self.angular_points
# Make array of angles adjusted up away from zero, to keep from
# calculating a radial line's contribution to itself
self.vmodel['angular_grid'] = np.linspace(
0, self.vmodel['epsilon_max'], num=self.angular_points,
endpoint=False
)
# This maps addition over the whole array automatically
self.vmodel['angular_grid'] += self.vmodel['d_alpha'] / 2
# Makes a 2d array full of zero values
self.vmodel['density_grid'] = np.zeros((self.radial_points,
self.angular_points))
# Do the main computation
self._compute_fragment_density()
# Turn our grid into a function of r
self._interpolate_radial_density()
# Get the column density at our grid points and interpolate
self._compute_column_density()
# Count up the number of fragments in the grid versus theoretical value
self.vmodel['num_fragments_theory'] = self.calc_num_fragments_theory()
self.vmodel['num_fragments_grid'] = self.calc_num_fragments_grid()
[docs] @classmethod
def binned_production(cls, qs, fragment, parent, ts, **kwargs):
""" Alternate constructor for vectorial model
Parameters
----------
qs : `~astropy.units.Quantity`
List of steady production rates, per time, with length equal to
that of ``ts``.
parent: `~sbpy.data.Phys`
Same as __init__
fragment: `~sbpy.data.Phys`
Same as __init__
ts : `~astropy.units.Quantity`
List of times corresponding to when the production qs begin,
with positive times indicating the past.
kwargs: variable, optional
Any additional parameters in kwargs are passed on to __init__,
which are documented above and may be passed in here.
Returns
-------
VectorialModel
Instance of the VectorialModel class
Examples
--------
This specifies that from 30 days ago to 7 days ago, the production
was 1.e27, changes to 3.e27 between 7 and 5 days ago, then falls to
2.e27 from 5 days ago until now
>>> q_example = [1.e27, 3.e27, 1.e27] * (1/u.s)
>>> t_example = [30, 7, 5] * u.day
Notes
-----
Preserves Festou's original fortran method of describing time
dependence in the model - time bins of steady production at
specified intervals
The base production of the model is taken from the first element in
the production array, which assumes the arrays are time-ordered
from oldest to most recent. The base production extends backward
in time to infinity, so take care when using this method for time
dependence if that is not what is intended.
"""
return cls(base_q=qs[0],
q_t=VectorialModel._make_binned_production(qs, ts),
fragment=fragment, parent=parent, **kwargs)
def _make_binned_production(qs, ts):
""" Produces a time dependence function out of lists given to
binned_production constructor
Parameters
----------
qs : `astropy.units.Quantity`
See binned_production for description
ts : `astropy.units.Quantity`
See binned_production for description
Returns
-------
q_t : function
See __init__ for description
Notes
-----
We create a model-compatible function for time dependence out of
the information specified in the arrays qs and ts
The resulting function gives a steady specified production within
the given time windows
"""
q_invsecs = qs.to(1 / (u.s)).value
t_at_p_secs = ts.to(u.s).value
base_q = qs[0]
# extend the arrays to simplify our comparisons in binned_q_t
# last bin stops at observation time, t = 0
t_at_p_secs = np.append(t_at_p_secs, [0])
# junk value for production because this is in the future
q_invsecs = np.append(q_invsecs, [0])
def q_t(t):
# too long in the past?
if t > t_at_p_secs[0] or t < 0:
return base_q
# in the future?
if t < 0:
return 0
# loop over all elements except the last so we can always look at
# [index+1] for the comparison
for i in range(len(q_invsecs) - 1):
if t < t_at_p_secs[i] and t > t_at_p_secs[i + 1]:
return q_invsecs[i]
return q_t
def _make_steady_production(self):
""" Produces a time dependence function that contributes no extra
parents at any time
Notes
-----
If no q_t is given, we use this as our time dependence as the
model needs a q_t to run
"""
# No additional production at any time
def q_t(t):
return 0
return q_t
def _setup_calculations(self):
""" Miscellaneus calculations to inform the model later
Notes
-----
Calculates the collision sphere radius, coma radius, time to
permanent flow regime, the maximum radius our grid could possibly
need to extend out to, and the maximum angle that a fragment's
trajectory can deviate from its parent's trajectory (which is
assumed to be radial)
"""
"""
Calculate collision sphere radius based on the first production
rate, Eq. (5) in Festou 1981
Note that this is only calculated with the base production rate,
because it is assumed that the base production rate has had roughly
enough time to reach a steady state before letting production vary
with time.
"""
# This v_therm factor comes from molecular flux of ideal gas moving
# through a surface, in our case the surface of the collision sphere
# The gas is collisional inside this sphere and free beyond it, so we
# can model this as effusion
v_therm = self.parent['v_outflow'] * 0.25
q = self.base_q
vp = self.parent['v_outflow']
vf = self.fragment['v_photo']
# Eq. 5 of Festou 1981
self.vmodel['collision_sphere_radius'] = (
(self.parent['sigma'] * q * v_therm) / (vp * vp)
) * u.m
# Calculates the radius of the coma (parents only), given our input
# parameters
# NOTE: Equation (16) of Festou 1981 where alpha is the percent
# destruction of molecules
parent_beta_r = -np.log(1.0 - self.parent_destruction_level)
parent_r = parent_beta_r * vp * self.parent['tau_T']
self.vmodel['coma_radius'] = parent_r * u.m
# Calculates the time needed to hit a steady, permanent production of
# fragments
fragment_beta_r = -np.log(1.0 - self.fragment_destruction_level)
perm_flow_radius = (
self.vmodel['coma_radius'].value +
((vp + vf) * fragment_beta_r * self.fragment['tau_T'])
)
t_secs = (
self.vmodel['coma_radius'].value / vp +
(perm_flow_radius - self.vmodel['coma_radius'].value)
/ (vp + vf)
)
self.vmodel['t_perm_flow'] = (t_secs * u.s).to(u.day)
# This is the total radial size that parents & fragments occupy, beyond
# which we assume zero density
self.vmodel['max_grid_radius'] = perm_flow_radius * u.m
# Two cases for angular range of ejection of fragment based on relative
# velocities of parent and fragment species
if(vf < vp):
self.vmodel['epsilon_max'] = np.arcsin(vf / vp)
else:
self.vmodel['epsilon_max'] = np.pi
[docs] def production_at_time(self, t):
""" Get production rate at time t
Parameters
----------
t : float
Time in seconds, with positive values representing the past
Returns
-------
numpy.float64
Production rate, unitless, at the specified time
"""
return self.base_q + self.q_t(t)
def _make_radial_logspace_grid(self):
""" Create an appropriate radial grid based on the model parameters
Returns
-------
ndarray
Logarithmically spaced samples of the radial space around the
coma, out to a maximum distance
Notes
-----
Creates a grid (in meters) with numpy's logspace function that
covers the expected radial size, stretching from 2 times the
collision sphere radius (near the nucleus be dragons) out to the
calculated max. If we get too close to the nucleus things go very
badly so don't do it, dear reader
"""
start_power = np.log10(
self.vmodel['collision_sphere_radius'].value * 2
)
end_power = np.log10(self.vmodel['max_grid_radius'].value)
return np.logspace(
start_power, end_power,
num=self.radial_points, endpoint=True
)
def _compute_fragment_density(self):
""" Computes the density of fragments as a function of radius
Notes
-----
Computes the density at different radii and due to each ejection
angle, performing the radial integration of eq. (36), Festou 1981
with only one fragment velocity. The resulting units will be in
1/(m^3) as we work in m, s, and m/s.
The density is first found on a radial grid, then interpolated to
find density as a function of arbitrary radius. We use our results
from the grid to calculate the total number of fragments in the
coma for comparison to the theoretical number we expect, to provide
the user with a rough idea of how well the chosen radial and
angular grid sizes have captured the appropriate amount of
particles. Note that some level of disagreement is expected
because the parent_destruction_level and fragment_destruction_level
parameters cut the grid off before all particles can dissociate,
and thus some escape the model and come up missing in the fragment
count based on the grid.
"""
vp = self.parent['v_outflow']
vf = self.fragment['v_photo']
# Follow fragments until they have been totally destroyed
time_limit = self.max_fragment_lifetimes * self.fragment['tau_T']
r_coma = self.vmodel['coma_radius'].value
r_limit = r_coma
# temporary radial array for when we loop through 0 to epsilonMax
ejection_radii = np.zeros(self.radial_substeps)
p_tau_T = self.parent['tau_T']
f_tau_T = self.fragment['tau_T']
p_tau_d = self.parent['tau_d']
# Compute this once ahead of time
# More factors to fill out integral similar to eq. (36) Festou 1981
integration_factor = (
(1 / (4 * np.pi * p_tau_d)) *
self.vmodel['d_alpha'] / (4.0 * np.pi)
)
# Calculate the density contributions over the volume of the comet
# atmosphere due to one ejection axis
for j in range(0, self.angular_points):
cur_angle = self.vmodel['angular_grid'][j]
# Loop through the radial points along this axis
for i in range(0, self.radial_points):
cur_r = self.vmodel['fast_radial_grid'][i]
x = cur_r * np.sin(cur_angle)
y = cur_r * np.cos(cur_angle)
# Decide how granular our epsilon should be
d_epsilon_steps = len(ejection_radii)
d_epsilon = (
(self.vmodel['epsilon_max'] - cur_angle)
/ d_epsilon_steps
)
# Maximum radius that contributes to point x,y when there is a
# a max ejection angle
if(self.vmodel['epsilon_max'] < np.pi):
r_limit = y - (x / np.tan(self.vmodel['epsilon_max']))
# Set the last element to be r_coma or the above limit
ejection_radii[d_epsilon_steps - 1] = r_limit
# We already filled out the very last element in the array
# above, so it's d_epsilon_steps - 1
for dE in range(0, d_epsilon_steps - 1):
ejection_radii[dE] = (
y -
x / np.tan((dE + 1) * d_epsilon + cur_angle)
)
ejection_radii_start = 0
# Number of slices along the contributing axis for each step
num_slices = self.angular_substeps
# Loop over radial chunk that contributes to x,y
for ejection_radii_end in ejection_radii:
# We are slicing up this axis into pieces
dr = (
(ejection_radii_end - ejection_radii_start) /
num_slices
)
# Loop over tiny slices along this chunk
for m in range(0, num_slices):
# TODO: We could probably eliminate m by making a
# linear space from ejection_radii_start to
# ejection_radii_end
# Current distance along contributing axis
cur_r = (m + 0.5) * dr + ejection_radii_start
# This is the distance from the NP axis point to the
# current point on the ray, squared
sep_dist = np.sqrt(x * x + (cur_r - y) * (cur_r - y))
cos_eject = (y - cur_r) / sep_dist
sin_eject = x / sep_dist
# Calculate sqrt(vR^2 - u^2 sin^2 gamma)
v_factor = np.sqrt(
vf * vf - (vp * vp) * sin_eject ** 2)
# The first (and largest) of the two solutions for the
# velocity when it arrives
v_one = vp * cos_eject + v_factor
# Time taken to travel from the dissociation point at
# v1, reject if the time is too large (and all
# fragments have decayed)
t_frag_one = sep_dist / v_one
if t_frag_one > time_limit:
continue
# This is the total time between parent emission from
# nucleus and fragment arriving at our point of
# interest, which we then use to look up Q at that time
# in the past
t_total_one = (cur_r / vp) + t_frag_one
# Division by parent velocity makes this production per
# unit distance for radial integration q(r, epsilon)
# given by eq. 32, Festou 1981
prod_one = self.production_at_time(t_total_one) / vp
q_r_eps_one = (
(v_one * v_one * prod_one) /
(vf * np.abs(v_one - vp * cos_eject))
)
# Parent extinction when traveling along to the
# dissociation site
p_extinction = np.e ** (-cur_r / (p_tau_T * vp))
# Fragment extinction when traveling at speed v1
f_extinction_one = np.e ** (-t_frag_one / f_tau_T)
# First differential addition to the density
# integrating along dr, similar to eq. (36) Festou
# 1981, due to the first velocity
n_r_one = (
(p_extinction * f_extinction_one * q_r_eps_one) /
(sep_dist ** 2 * v_one)
)
# Add this contribution to the density grid
self.vmodel['density_grid'][i][j] = (
self.vmodel['density_grid'][i][j] +
n_r_one * dr
)
# Check if there is a second solution for the velocity
if vf > vp:
continue
# Compute the contribution from the second solution for
# v in the same way
v_two = vp * cos_eject - v_factor
t_frag_two = sep_dist / v_two
if t_frag_two > time_limit:
continue
t_total_two = (cur_r / vp) + t_frag_two
prod_two = self.production_at_time(t_total_two) / vp
q_r_eps_two = (
(v_two * v_two * prod_two) /
(vf * np.abs(v_two - vp * cos_eject))
)
f_extinction_two = np.e ** (-t_frag_two / f_tau_T)
n_r_two = (
(p_extinction * f_extinction_two * q_r_eps_two) /
(v_two * sep_dist ** 2)
)
self.vmodel['density_grid'][i][j] = (
self.vmodel['density_grid'][i][j] +
n_r_two * dr
)
# Next starting radial point is the current end point
ejection_radii_start = ejection_radii_end
if(self.print_progress is True):
progress_percent = (j + 1) * 100 / self.angular_points
print(f'Computing: {progress_percent:3.1f} %', end='\r')
# Loops automatically over the 2d grid
self.vmodel['density_grid'] *= integration_factor
# phew
"""
Performs angular part of the integration to yield density in m^-3
as a function of radius. Assumes spherical symmetry of parent
production.
Fills vmodel['radial_density'] and vmodel['fast_radial_density']
with and without units respectively
Fills vmodel['r_dens_interpolation'] with cubic spline
interpolation of the radial density, which takes radial coordinate
in m and outputs the density at that coord in m^-3, without units
attached
"""
# Make array to hold our data, no units
self.vmodel['fast_radial_density'] = np.zeros(self.radial_points)
# loop through grid array
for i in range(0, self.radial_points):
for j in range(0, self.angular_points):
# Current angle is theta
theta = self.vmodel['angular_grid'][j]
# Integration factors from angular part of integral, similar to
# eq. (36) Festou 1981
dens_contribution = (
2.0 * np.pi * np.sin(theta) *
self.vmodel['density_grid'][i][j]
)
self.vmodel['fast_radial_density'][i] += dens_contribution
# Tag with proper units
self.vmodel['radial_density'] = (
self.vmodel['fast_radial_density'] / (u.m ** 3)
)
def _interpolate_radial_density(self):
""" Interpolate the radial density.
Takes our fragment density grid and constructs density as a function of
arbitrary radius
"""
if not scipy:
raise RequiredPackageUnavailable('scipy')
# Interpolate this radial density grid with a cubic spline for lookup
# at non-grid radii, input in m, output in 1/m^3
self.vmodel['r_dens_interpolation'] = (
CubicSpline(self.vmodel['fast_radial_grid'],
self.vmodel['fast_radial_density'],
bc_type='natural')
)
def _column_density_at_rho(self, rho):
""" Calculate the column density of fragments at a given impact parameter
Parameters
----------
rho : float
Impact parameter of the column density integration, in meters
Returns
-------
float
Column density at the given impact parameter in m^-2, no
astropy units attached
Notes
-----
We return zero column density beyond the edge of our grid, so if
there is still significant column density near the edge of the grid
this can lead to strange graphing results and sharp cutoffs.
"""
r_max = self.vmodel['max_grid_radius'].value
if(rho > r_max):
return 0
rhosq = rho ** 2
z_max = np.sqrt(r_max ** 2 - rhosq)
def column_density_integrand(z):
return self.vmodel['r_dens_interpolation'](np.sqrt(z ** 2 + rhosq))
# Romberg is significantly slower for impact parameters near the
# nucleus, and becomes much faster at roughly 60 times the collision
# sphere radius, after a few tests
# The results of both were the same to within .1% or better, generally
# TODO: test this for a range of productions to see if this holds
if rho < (60 * self.vmodel['collision_sphere_radius'].value):
c_dens = (
quad(column_density_integrand,
-z_max, z_max, limit=3000)
)[0]
else:
c_dens = (
2 * romberg(column_density_integrand,
0, z_max, rtol=0.0001, divmax=20)
)
# result is in 1/m^2
return c_dens
def _compute_column_density(self):
""" Compute the column density on the grid and interpolate the results
Computes the fragment column density on a grid and interpolates for
fragment column density as a function of arbitrary radius.
Notes
-----
The interpolator returns column density in m^-2, no astropy units
attached
"""
if not scipy:
raise RequiredPackageUnavailable('scipy')
# make a grid for the column density values
column_density_grid = self._make_radial_logspace_grid()
# vectorize so we can hand the whole grid to our column density
# computing function in one line
cd_vectorized = np.vectorize(self._column_density_at_rho)
# array now holds corresponding column density values
column_densities = cd_vectorized(column_density_grid)
self.vmodel['fast_column_density_grid'] = column_density_grid
self.vmodel['column_density_grid'] = column_density_grid * u.m
self.vmodel['column_densities'] = column_densities / (u.m ** 2)
# Interpolation gives column density in m^-2
self.vmodel['column_density_interpolation'] = (
CubicSpline(
column_density_grid,
column_densities,
bc_type='natural'
)
)
[docs] def calc_num_fragments_theory(self):
""" The total number of fragment species we expect in the coma
Returns
-------
float
Total number of fragment species we expect in the coma
theoretically
Notes
-----
This needs to be rewritten to better handle time dependence; the
original implementation was designed for abrupt but small parent
production changes.
"""
# TODO: Re-implement this as differential equations with parent
# photodissociation as a source of fragments, and fragment
# photodissociation as a sink
vp = self.parent['v_outflow']
vf = self.fragment['v_photo']
p_tau_T = self.parent['tau_T']
f_tau_T = self.fragment['tau_T']
p_tau_d = self.parent['tau_d']
t_perm = self.vmodel['t_perm_flow'].to(u.s).value
alpha = f_tau_T * p_tau_T / p_tau_d
max_r = self.vmodel['max_grid_radius'].value
last_density_element = len(self.vmodel['fast_radial_density']) - 1
edge_adjust = (
(np.pi * max_r * max_r * (vf + vp) *
self.vmodel['fast_radial_density'][last_density_element])
)
num_time_slices = 10000
# array starting at tperm (farthest back in time we expect something to
# hang around), stepped down to zero (when the observation takes place)
time_slices = np.linspace(t_perm, 0, num_time_slices, endpoint=False)
theory_total = 0
for i, t in enumerate(time_slices[:-1], start=0):
if t > t_perm:
t1 = t_perm
else:
t1 = t
t1 /= p_tau_T
if i != (num_time_slices - 1):
t2 = time_slices[i + 1]
else:
t2 = 0
t2 /= p_tau_T
mult_factor = -np.e ** (-t1) + np.e ** (-t2)
theory_total += self.production_at_time(t) * mult_factor
theory_total *= alpha
theory_total -= edge_adjust
return theory_total
[docs] def calc_num_fragments_grid(self):
""" Total number of fragments in the coma.
Calculates the total number of fragment species by integrating the
density grid over its volume
Returns
-------
float
Number of fragments in the coma based on our grid calculations
Notes
-----
Outbursts/time dependent production in general will make this
result poor due to the grid being sized to capture a certain
fraction of parents/fragments at the oldest (first) production
rate. The farther you get from this base production, the farther
the model will deviate from capturing the requested percentage of
particles.
"""
max_r = self.vmodel['max_grid_radius'].value
def vol_integrand(r, r_func):
return (r_func(r) * r ** 2)
r_int = romberg(
vol_integrand, 0, max_r,
args=(self.vmodel['r_dens_interpolation'], ),
rtol=0.0001, divmax=20)
return 4 * np.pi * r_int
def _column_density(self, rho):
""" Gives fragment column density at arbitrary impact parameter
Parameters
----------
rho : float
Impact parameter, in meters, no astropy units attached
Returns
-------
float
Fragment column density at given impact parameter, in m^-2
"""
return self.vmodel['column_density_interpolation'](rho)
def _volume_density(self, r):
""" Gives fragment volume density at arbitrary radius
Parameters
----------
r : float
Distance from nucles, in meters, no astropy units attached
Returns
-------
float
Fragment volume density at specified radius
"""
return self.vmodel['r_dens_interpolation'](r)