Source code for firefly.data_reader.FIREreader

import numpy as np
import os

from .reader import Reader,ParticleGroup

from abg_python.snapshot_utils import openSnapshot,iterativeCoM
from abg_python.cosmo_utils import load_rockstar

[docs]class FIREreader(Reader): """This is an example of a "custom" Reader that has been tuned to open data from the `FIRE galaxy formation collaboration <https://fire.northwestern.edu>`_. """
[docs] def __init__(self, snapdir, snapnum, ptypes=None, UInames=None, decimation_factors=None, fields=None, filterFlags=None, colormapFlags=None, radiusFlags=None, logFlags=None, **kwargs): """Base initialization method for FIREreader instances. A FIREreader will conveniently read `FIRE collaboration <http://fire.northwestern.edu>`_ data and produce firefly compatible :code:`.json` files. :param snapdir: directory that contains all the hdf5 data files :type snapdir: str :param snapnum: which snapshot to open :type snapnum: int :param ptypes: which particle types to extract (e.g. :code:`'PartType%d'`), defaults to [] :type ptypes: list of int, optional :param UInames: what should the particle groups be called in the webapp UI, defaults to :code:`['PartType%d for in in ptypes]` :type UInames: list of str, optional :param decimation_factors: factor by which to reduce the data randomly i.e. :code:`data=data[::decimation_factor]`, defaults to :code:`[1 for i in ptypes]` :type decimation_factors: list of int, optional :param fields: names of fields to open from snapshot data (e.g. Temperature, AgeGyr, Density). Shared between particle types but if a particle type does not have a field (e.g. PartType4 does not have Temperature, PartType0 does not have AgeGyr) then that field is skipped for that particle type. defaults to [] :type fields: list of str, optional :param filterFlags: flags to signal whether field should be in filter dropdown, defaults to [True for i in fields] :type filterFlags: list of bool, optional :param colormapFlags: flags to signal whether field should be in colormap dropdown, defaults to [True for i in fields] :type colormapFlags: list of bool, optional :param radiusFlags: flags to signal whether field should be in radius dropdown, defaults to [False for i in fields] :type radiusFlags: list of bool, optional :param logFlags: flags to signal whether the log of the field should be taken, defaults to [False for i in fields] :type logFlags: list of bool, optional :raises ValueError: if the length of ptypes, UInames, and decimation factors does not match. :raises ValueError: if the length of fields, filterFlags, colormapFlags, radiusFlags, and logFlags does not match. :raises FileNotFoundError: if snapdir cannot be found """ ## handle default input for particle groups if ptypes is None: ptypes = [] if UInames is None: UInames = ["PartType%d" % i for i in ptypes] if decimation_factors is None: decimation_factors = [1 for i in ptypes] ## handle default input for fields if fields is None: fields = [] if filterFlags is None: filterFlags = [True for i in fields] if colormapFlags is None: colormapFlags = [True for i in fields] if radiusFlags is None: radiusFlags = [False for i in fields] if logFlags is None: logFlags = [False for i in fields] ## input validation ## ptypes lists = [decimation_factors,UInames] names = ['decimation_factors','UInames'] for name,llist in zip(names,lists): if len(llist) != len(ptypes): raise ValueError( "%s is not the same length as ptypes (%d,%d)"%( name,len(llist),len(ptypes))) ## fields lists = [filterFlags,colormapFlags,radiusFlags,logFlags] names = ['filterFlags','colormapFlags','radiusFlags','logFlags'] for name,llist in zip(names,lists): if len(llist) != len(fields): raise ValueError( "%s is not the same length as fields (%d,%d)"%( name,len(llist),len(fields))) ## IO/snapshots if not os.path.isdir(snapdir): raise FileNotFoundError("Cannot find %s"%snapdir) ## this I handle separately if 'Coordinates' in fields: print("Do not put Coordinates in fields, removing it... (and its corresponding flags)") fields = list(fields) filterFlags = list(filterFlags) colormapFlags = list(colormapFlags) radiusFlags = list(radiusFlags) logFlags = list(logFlags) index = fields.index('Coordinates') for llist in [fields,filterFlags,logFlags]: llist.pop(index) ## where to find the HDF5 files self.snapdir = snapdir self.snapnum = snapnum ## which particles we want to extract self.ptypes = ptypes ## what do we want to call those particles in the UI self.UInames = UInames ## do we want to decimate the arrays at all? self.decimation_factors = decimation_factors ## what attributes do we want to load of that particle type? self.fields = fields ## do we want to filter on that attribute? self.filterFlags = filterFlags ## do we want to color by that attribute? self.colormapFlags = colormapFlags ## do we want to scale particle size by that attribute? self.radiusFlags = radiusFlags ## do we need to take the log of it self.logFlags = logFlags ####### execute generic Reader __init__ below ####### super().__init__(**kwargs)
[docs] def loadData( self, com = None, vcom = None, ): """Loads FIRE snapshot data using Alex Gurvich's :func:`firefly.data_reader.snapshot_utils.openSnapshot`. (reproduced from https://github.com/agurvich/abg_python) and binds it to a corresponding :class:`firefly.data_reader.ParticleGroup` instance. :param com: position to offset all coordinates by if None, will calculate the CoM, defaults to None :type com: np.ndarray, optional """ for ptype,UIname,dec_factor in list( zip(self.ptypes,self.UInames,self.decimation_factors))[::-1]: print("Loading ptype %d"%ptype) ## load each particle type's snapshot data snapdict = openSnapshot( self.snapdir, self.snapnum, int(ptype), ## ptype should be 1,2,3,4,etc... keys_to_extract = ['Coordinates']+self.fields+['Masses']*(com is None)+['Velocities'] ) if com is None: if os.path.isdir(os.path.join(self.snapdir,'..','halo','rockstar_dm')): com,rvir,vcom = load_rockstar( self.snapdir, self.snapnum, extra_names_to_read=['velocity']) else: com,vcom = iterativeCoM( snapdict['Coordinates'], snapdict['Masses'], snapdict['Velocities']) if vcom is None: vcom = 0 snapdict['Coordinates']-=com snapdict['Velocities'] -=vcom ## initialize output arrays for fields field_names = [] field_arrays = [] field_filter_flags = [] field_colormap_flags = [] field_radius_flags = [] for field,filterFlag,colormapFlag,radiusFlag,logFlag in list(zip( self.fields, self.filterFlags, self.colormapFlags, self.radiusFlags, self.logFlags)): ## easypeasy, just read from the snapdict if field in snapdict: arr = snapdict[field] ## if asked to compute galactocentric radius from the coordinates elif field == 'GCRadius': arr = np.sqrt(np.sum(snapdict['Coordinates']**2,axis=1)) ## elif field == ' ': ## NOTE: define custom shortcut field here else: continue ## take the log and/or magnitude if requested if logFlag: arr = np.log10(arr) field = 'log10%s'%field ## append this field into the arrays field_names = np.append( field_names, [field],axis=0) field_filter_flags = np.append( field_filter_flags, [filterFlag],axis=0) field_colormap_flags = np.append( field_colormap_flags, [colormapFlag],axis=0) field_radius_flags = np.append( field_radius_flags, [radiusFlag],axis=0) field_arrays.append(arr) ## initialize a particleGroup instance ## for this particle type self.particleGroups = np.append( self.particleGroups, [ParticleGroup( UIname, snapdict['Coordinates'], snapdict['Velocities'], field_names=field_names, field_arrays=np.array(field_arrays).reshape(-1,snapdict['Coordinates'].shape[0]), decimation_factor=dec_factor, field_filter_flags=field_filter_flags, field_colormap_flags=field_colormap_flags, field_radius_flags=field_radius_flags)], axis=0) ## reverse the order to match specified ptypes order self.particleGroups = self.particleGroups[::-1] ## save the filenames that were opened ## so you can re-open them yourself in that order if you need to for particleGroup in self.particleGroups: particleGroup.filenames_opened = snapdict['fnames'] ## add this particle group to the reader's settings file self.settings.attachSettings(particleGroup) return self.particleGroups
[docs]class SimpleFIREreader(FIREreader):
[docs] def __init__( self, path_to_snapshot, decimation_factor=10, JSONdir=None, write_to_disk=True, com=False, **kwargs): """ A wrapper to :class:`firefly.data_reader.FIREreader` that will open FIRE collaboration formatted data with minimal interaction from the user and use a "standard" firefly setup with: :code:`ptypes = [0,4]` :code:`UInames = ['gas','stars']` :code:`fields = ['AgeGyr','Temperature','GCRadius']` along with some "standard" settings with: :code:`settings['color']['gas'] = [1,0,0,1]` :code:`settings['color']['stars'] = [0,0,1,1]` :code:`settings['sizeMult']['gas'] = 1` :code:`settings['sizeMult']['stars'] = 1` :code:`settings['camera'] = [0,0,-15]` :param path_to_snapshot: path to .hdf5 file(s; can be a directory) :type path_to_snapshot: str :param decimation_factor: factor by which to reduce the data randomly i.e. :code:`data=data[::decimation_factor]`, defaults to 10 :type decimation_factor: int, optional :param JSONdir: the sub-directory that will contain your JSON files, relative to your :code:`$HOME directory`. , defaults to :code:`$HOME/<JSON_prefix>` :type JSONdir: str, optional :param write_to_disk: flag that controls whether data is saved to disk (:code:`True`) or only converted to a string and stored in :code:`self.JSON` (:code:`False`), defaults to True :type write_to_disk: bool, optional :param com: flag to offset all coordinates by the COM of the snapshot, defaults to False :type com: bool, optional :raises ValueError: if a snapnum cannot be inferred from the path_to_snapshot """ ## strip off a trailing / if it's there if path_to_snapshot[-1:] == os.sep: path_to_snapshot = path_to_snapshot[:-1] if '.hdf5' not in path_to_snapshot: snapdir = os.path.dirname(path_to_snapshot) try: snapnum = int(path_to_snapshot.split('_')[-1]) except: raise ValueError( "%s should be formatted as 'path/to/output/snapdir_xxx'"%path_to_snapshot+ " where xxx is an integer") else: snapdir = os.path.dirname(path_to_snapshot) try: snapnum = int(path_to_snapshot.split('_')[-1][:-len('.hdf5')]) except: raise ValueError( "%s should be formatted as 'path/to/output/snapshot_xxx.hdf5'"%path_to_snapshot+ " where xxx is an integer") ## relative path -> symbolic link if JSONdir is None: JSONdir="FIREData_%d"%snapnum ## initialize the reader object super().__init__( snapdir, snapnum, ptypes=[0,4], UInames=['Gas','Stars'], decimation_factors=[decimation_factor,decimation_factor], fields=['AgeGyr','Temperature','GCRadius'], logFlags=[False,True,False], JSON_prefix='Data', JSONdir=JSONdir, **kwargs) ## load the data self.loadData(com=com) self.settings['color']['Gas'] = [1,0,0,1] self.settings['color']['Stars'] = [0,0,1,1] self.settings['sizeMult']['Gas'] = 1 self.settings['sizeMult']['Stars'] = 1 self.settings['camera'] = [0,0,-15] ## dump the data files to disk self.writeToDisk(loud=True,write_to_disk=write_to_disk,extension='.ffly')
class STARFORGEreader(FIREreader): def __init__( self, path_to_snapshot, decimation_factor=10, JSONdir=None, write_to_disk=True, com=False, **kwargs): """ A wrapper to :class:`firefly.data_reader.FIREreader` that will open `STARFORGE collaboration <http://starforge.space>`_ formatted data with minimal interaction from the user and use a "standard" firefly setup with: :code:`ptypes = [0,5]` :code:`UInames = ['gas','stars']` :code:`fields = ['AgeGyr','Temperature','GCRadius']` along with some "standard" settings with: :code:`settings['color']['gas'] = [1,0,0,1]` :code:`settings['color']['stars'] = [0,0,1,1]` :code:`settings['sizeMult']['gas'] = 1` :code:`settings['sizeMult']['stars'] = 1` :code:`settings['camera'] = [0,0,-15]` :param path_to_snapshot: path to .hdf5 file(s; can be a directory) :type path_to_snapshot: str :param decimation_factor: factor by which to reduce the data randomly i.e. :code:`data=data[::decimation_factor]`, defaults to 10 :type decimation_factor: int, optional :param JSONdir: the sub-directory that will contain your JSON files, relative to your :code:`$HOME directory`. , defaults to :code:`$HOME/<JSON_prefix>` :type JSONdir: str, optional :param write_to_disk: flag that controls whether data is saved to disk (:code:`True`) or only converted to a string and stored in :code:`self.JSON` (:code:`False`), defaults to True :type write_to_disk: bool, optional :param com: flag to offset all coordinates by the COM of the snapshot, defaults to False :type com: bool, optional :raises ValueError: if a snapnum cannot be inferred from the path_to_snapshot """ ## strip off a trailing / if it's there if path_to_snapshot[-1:] == os.sep: path_to_snapshot = path_to_snapshot[:-1] snapdir = os.path.dirname(path_to_snapshot) try: snapnum = int(path_to_snapshot.split('_')[-1]) except: raise ValueError( "%s should be formatted as 'path/to/output/snapdir_xxx'"%path_to_snapshot+ " where xxx is an integer") ## relative path -> symbolic link if JSONdir is None: JSONdir="STARFORGEData_%d"%snapnum ## initialize the reader object super().__init__( snapdir, snapnum, ptypes=[0,5], UInames=['Gas','Stars'], decimation_factors=[decimation_factor,decimation_factor], fields=['AgeGyr','Temperature','GCRadius'], logFlags=[False,True,False,False], JSON_prefix='Data', JSONdir=JSONdir, **kwargs) ## load the data self.loadData(com=com) self.settings['color']['Gas'] = [1,0,0,1] self.settings['color']['Stars'] = [0,0,1,1] self.settings['sizeMult']['Gas'] = 1 self.settings['sizeMult']['Stars'] = 1 self.settings['camera'] = [0,0,-15] ## dump the JSON files self.writeToDisk(loud=True,write_to_disk=write_to_disk)