Source code for astro_tigress.tigress_read

import os
import os.path as osp
import numpy as np

from . import athena_read
from . import const
from . import ytathena as ya
from . import radmc


[docs]class Model: """Class containing the simulation model information. Parameters ---------- model_id: str name of the model, e.g. `R8_2pc`. """ def __init__(self, model_id, dir_master): self.dir_master = dir_master self.model_id = model_id self.dir_model = osp.join(dir_master, model_id, "") if osp.isdir(self.dir_model): subdirs = os.listdir(self.dir_model) ivtks = sorted([int(x) for x in subdirs if x.isnumeric()]) self.ivtks = np.array(ivtks) # list of vtk numbers of outputs # contained data sets, e.g. ["MHD", "chem", "CO_lines"] self.data_sets = {} for ivtk in self.ivtks: dir_ivtk = "{:s}{:04d}/".format(self.dir_model, ivtk) self.data_sets[ivtk] = [ d for d in os.listdir(dir_ivtk) if os.path.isdir(os.path.join(dir_ivtk, d)) ] self._load_hst() self._load_input() self.CO_lines = {} return def _load_hst(self): # check history file fn_hst = "{:s}history/MHD/{:s}.hst".format(self.dir_model, self.model_id) if osp.isfile(fn_hst): self.hst = athena_read.hst(fn_hst) self.fn_hst = fn_hst def _load_input(self): # read time output interval in input file fn_input_MHD = "{:s}input/MHD/{:s}.par".format(self.dir_model, self.model_id) # time units self.tunit = const.pc / const.km self.tunit_Myr = self.tunit / const.Myr if osp.isfile(fn_input_MHD): self.MHD_input = athena_read.athinput(fn_input_MHD) self.fn_input_MHD = fn_input_MHD for k in self.MHD_input.keys(): if k.startswith("output"): if self.MHD_input[k]["out_fmt"] == "vtk": self.dt_code = self.MHD_input[k]["dt"] self.dt_Myr = self.dt_code * self.tunit_Myr self.t_Myr = self.ivtks * self.dt_Myr
[docs] def show(self): """Print available data list""" print("Avalable data:") if hasattr(self, "fn_hst"): print(" history") if hasattr(self, "fn_input_MHD"): print(" input") for i in self.ivtks: print(" ivtk = {:d}".format(i), end=" ") for d in self.data_sets[i]: print(d, end=" ") print(" ")
[docs] def download(self, ivtk=300, dataset="MHD", Z=1.0, iline=1): """Download simulation data from a certain data set. Parameters ---------- ivtk: int vtk output number dataset: str, ["MHD", "chem", "CO_lines", "history", "input", "all"] name of the dataset or download all. Z: float, [0.5, 1, 2] metallicity for chemistry post-processing. iline: int, [1, 2] uppper level of CO rotational line. 1 for J=1-0, 2 for J=2-1 """ fn = self._set_filename(ivtk, Z=Z, iline=iline) if dataset == "all": for d, f in fn.items(): self._download_file(f) else: for d in np.atleast_1d(dataset): self._download_file(fn[d])
[docs] def load(self, ivtk, dataset="MHD", Z=1.0, iline=1, Tdect=0.0): """Load simulation data from a certain data set. Parameters ---------- ivtk: int vtk output number dataset: str or list, ["MHD", "chem", "CO_lines", "MHD_PI", "all"] name of the dataset or load all. Z: float, [0.5, 1, 2] metallicity for chemistry post-processing. iline: int, [1, 2] uppper level of CO rotational line. 1 for J=1-0, 2 for J=2-1 Tdect: float detection limit for atenna temperature in K for each velocity channel. """ fn = self._set_filename(ivtk, Z=Z, iline=iline, add_master=True) if dataset == "all": for d in self.data_sets[ivtk]: self._loadone(fn[d], d, iline=iline, Tdect=Tdect) else: for d in np.atleast_1d(dataset): self._loadone(fn[d], d, iline=iline, Tdect=Tdect)
def _loadone(self, f, d, iline=1, Tdect=0.0): if d == "MHD": self.MHD = DataMHD(f) elif d == "chem": self.chem = DataChem(f) elif d == "CO_lines": self.CO_lines[iline] = DataCO(f, iline, Tdect) elif d == "MHD_PI": self.MHD_PI = DataRad(f) else: msg = "ERROR: Model.load(): dataset name not recogonozed.\n" msg += 'dataset: {"MHD", "chem", "CO_lines", "MHD_PI"}' raise RuntimeError(msg) def _set_filename(self, ivtk, Z=1.0, iline=1, add_master=False): source_dir_ivtk = "{:s}/{:04d}/".format(self.model_id, ivtk) if add_master: source_dir_ivtk = osp.join(self.dir_master, source_dir_ivtk) dict_Z = {0.5: "Z05", 1.0: "Z1", 2.0: "Z2"} Z_id = dict_Z[Z] fn = dict() fn["MHD"] = "{:s}MHD/{:s}.{:04d}.vtk".format( source_dir_ivtk, self.model_id, ivtk ) fn["MHD_PI"] = "{:s}MHD_PI/{:s}.{:04d}.vtk".format( source_dir_ivtk, self.model_id, ivtk ) fn["chem"] = "{:s}chem/{:s}/{:s}-{:s}.{:04d}.athdf".format( source_dir_ivtk, Z_id, self.model_id, Z_id, ivtk ) fn["CO_lines"] = "{:s}CO_lines/{:s}/il{:d}/{:s}-{:s}.il{:d}.{:04d}.bout".format( source_dir_ivtk, Z_id, iline, self.model_id, Z_id, iline, ivtk ) fn["history"] = "{0:s}/history/MHD/{0:s}.hst".format(self.model_id) fn["input"] = "{0:s}/input/MHD/{0:s}.par".format(self.model_id) return fn def _download_file(self, f): import urllib.request from urllib.error import URLError import shutil target = osp.join(self.dir_master, f) if osp.isfile(target): print("{} ({:.5f}GB) already exists".format(f, osp.getsize(target) / 2**30)) while True: answer = input("overwrite? [y/n]:") if answer.lower() in ["y", "n"]: break if answer.lower() == "n": return os.makedirs(osp.dirname(target), exist_ok=True) url = "https://tigress-web.princeton.edu/~munan/astro-tigress/" source = url + f req = urllib.request.Request(source) try: urllib.request.urlopen(req) except URLError as e: if hasattr(e, "reason"): print("We failed to reach a server.") print("Reason: ", e.reason) elif hasattr(e, "code"): print("The server couldn't fulfill the request.") print("Error code: ", e.code) else: print("We reached ", url) print(" downloading...", osp.basename(source), end=" ") with urllib.request.urlopen(source) as response, open( target, "wb" ) as target: shutil.copyfileobj(response, target) # urllib.request.urlretrieve(source,target) print(" complete!")
class DataMHD: """MHD data in one time snapshot. Parameters ---------- fn: str filename. """ def __init__(self, fn): self.fn = fn self.ytds = ya.load_athena4p2_mhd(fn) # yt data self.grid = ya.get_covering_grid(self.ytds) # yt covering grid return class DataRad: """UV data in one time snapshot. Parameters ---------- fn: str filename. """ def __init__(self, fn): self.fn = fn self.ytds = ya.load_athena4p2_rad(fn) # yt data self.grid = ya.get_covering_grid(self.ytds) # yt covering grid return class DataChem: """Chemistry data in one time snapshot. Parameters ---------- fn: str filename. """ def __init__(self, fn): self.fn = fn self.ytds = ya.load_athenapp(fn) # yt data self.grid = ya.get_covering_grid(self.ytds) # yt covering grid return def get_col(self, spec, axis=2): """Calculate the column density of species. Parameters ---------- spec: str name of species, e.g. "H2". axis: int, [0, 1, 2] axis where the column is calculated along. 0:x, 1:y, 2:z. Returns ------- col: YTarray column density, e.g. N_H2. """ dx = self.grid["dx"][0, 0, 0] nH = self.grid["nH"] abd = self.grid[spec] ns = abd * nH col = ns.sum(axis=2) * dx return col class DataCO: """CO emission lines PPV in one time snapshot. Parameters ---------- fn: str filename iline: int, [1, 2] index for CO rotaional line upper level. Tdect: float detection limit for atenna temperature in K for each velocity channel. """ def __init__(self, fn, iline, Tdect=0.0): if iline == 1: self.lam0 = 2600.75763346 elif iline == 2: self.lam0 = 1300.4036558 else: msg = "ERROR: DataCO.__init__(): " msg += "Line index {:d} not recognized (iline=[1,2]).".format(iline) raise RuntimeError(msg) self.fn = fn self.Tdect = Tdect self.img = radmc.Images(fn, lam0=self.lam0) # PPV cube self.WCO = self.img.getimageWCO(Tdect=Tdect) # CO line intensity in K*km/s self.Tpeak = self.img.getTpeak(Tdect=Tdect) # peak atenna temperature in K # brightness weighted mean velocity and velocity dispersion in km/s self.vz, self.sigmav = self.img.getimage_velocities(Tdect=Tdect) return