Source code for tofu.openadas2tofu._read_files


# Built-in
import os
import re
import itertools as itt
import warnings

# Common
import numpy as np
from scipy.interpolate import RectBivariateSpline as scpRectSpl


__all__ = ['step03_read', 'step03_read_all']


_DTYPES = {'adf11': ['acd', 'ccd', 'scd', 'plt', 'prb'],
           'adf15': None}
_DEG = 1
_PECASFUNC = True


# #############################################################################
# #############################################################################
#                       Utility functions
# #############################################################################


def _get_PATH_LOCAL():
    pfe = os.path.join(os.path.expanduser('~'), '.tofu', 'openadas2tofu')
    if os.path.isdir(pfe):
        return pfe
    else:
        return None


def _get_subdir_from_pattern(path, pattern, mult=None):
    """ Get list of files matching patterns in path

    If no match => Exception
    If multiple matches
        => mult = True: pass
        => mult = 'warn': warning
        => mult = 'err': Exception
    """
    # Check inputs
    if mult is None:
        mult = 'err'
    if mult not in [True, 'warn', 'err']:
        msg = (
            "Arg mult must be in [True, 'warn', 'err']!\n"
            + "\t- provided: {}".format(mult)
        )
        raise Exception(msg)

    if isinstance(pattern, str):
        pattern = [pattern]

    # Get matches
    ld = [
        dd for dd in os.listdir(path)
        if os.path.isdir(os.path.join(path, dd))
        and all([pp in dd for pp in pattern])
    ]
    if len(ld) != 1:
        msg = ("You have no / many directories in your local "
               + "~/.tofu/openadas2tofu/ matching the desired file type:\n"
               + "\t- path: {}\n".format(path)
               + "\t- provided (all): {}\n".format(pattern)
               + "\t- available: {}\n".format(ld)
               + "  => download the data with "
               + "tf.openadas2tofu.step02_download()")
        if len(ld) == 0:
            raise Exception(msg)
        else:
            if mult == 'err':
                raise Exception(msg)
            elif mult == 'warn':
                warnings.warn(msg)
    return [os.path.join(path, dd) for dd in ld]


def _get_available_elements_from_path(path=None, typ1=None):
    # Check inputs
    if not os.path.isdir(path):
        msg = (
            "Provided path is not an existing directory!\n"
            + "\t- provided: {}".format(path)
        )
        raise Exception(msg)

    ltyp = ['adf15']
    if typ1 not in ltyp:
        msg = (
            "Only the following types of files are handled up to now:\n"
            + "\t- handled: {}\n".format(ltyp)
            + "\t- provided: {}".format(typ1)
        )
        raise Exception(msg)

    if typ1 == 'adf15':
        lf = [
            ff for ff in os.listdir(path)
            if all([ss in ff for ss in ['pec', '][']])
        ]
        element = [ff[ff.index('][')+2:] for ff in lf]
    return element


def _format_for_DataCollection_adf15(
    dout,
    dsource0=None,
    dref0=None,
    ddata0=None,
    dlines0=None,
):
    """
    Format dout from step03_read_all() for SPectralLines object
    (separated te, ne, ions, sources, lines)
    """

    # Remove already known lines of dlines0 provided
    if dlines0 is not None:
        # Check for mistakes
        dk0 = {
            k0: [
                k1 for k1, v1 in dlines0.items()
                if np.sum([(
                    v1['ion'] == v0['ion']
                    and v1['transition'] == v0['transition'],
                    v1['ion'] == v0['ion'] and v1['symbol'] == v0['symbol']
                )]) == 1
            ]
            for k0, v0 in dout.items()
        }
        dk0 = {k0: v0 for k0, v0 in dk0.items() if len(v0) > 0}
        if len(dk0) > 0:
            msg = (
                "\nPossible error in openadas detected,\n"
                + "the following lines have same ion and transition but "
                + "different symbol (typ0typ1-isoel):\n"
                + "\n".join([
                    "\t- {}: {}".format(k0, v0) for k0, v0 in dk0.items()
                ])
                + "\n\n  => There might be redundancy / errors in openadas"
            )
            warnings.warn(msg)

        dout = {
            k0: v0 for k0, v0 in dout.items()
            if not any([
                v1['ion'] == v0['ion'] and v1['transition'] == v0['transition']
                for v1 in dlines0.values()
            ])
        }

    # Get dict of unique sources
    lsource = sorted(set([v0['source'] for v0 in dout.values()]))
    dsource = {}
    if dsource0 is None:
        dsource = {
            'oa-adf15-{:02}'.format(ii): {'long': ss}
            for ii, ss in enumerate(lsource)
        }

    else:
        # Check against existing sources
        nmax = int(np.max([
            int(k0.split('-')[-1])
            for k0 in dsource0.keys() if 'oa-adf15' in k0
        ])) + 1
        for ii, ss in enumerate(lsource):
            lk0 = [k0 for k0, v0 in dsource0.items() if v0['long'] == ss]
            if len(lk0) == 0:
                k0 = 'oa-adf15-{:02}'.format(nmax)
                nmax += 1
            elif len(lk0) == 1:
                k0 = lk0[0]
            else:
                msg = (
                    "\nMultiple possible matches for source {}".format(ss)
                )
                raise Exception(msg)
            dsource[k0] = {'long': ss}

    # Get dict of unique Te and ne
    dte, dne = {}, {}
    if dref0 is None:
        ite, ine = 0, 0
    else:
        ite = int(np.max([
            int(k0.split('-')[-1]) for k0 in dref0.keys()
            if 'Te-' in k0 and 'oa-adf15' in ddata0[k0]['source']
        ])) + 1
        ine = int(np.max([
            int(k0.split('-')[-1]) for k0 in dref0.keys()
            if 'ne-' in k0 and 'oa-adf15' in ddata0[k0]['source']
        ])) + 1

    for k0, v0 in dout.items():

        # Get source
        sour = [
            k1 for k1, v1 in dsource.items() if v1['long'] == v0['source']
        ][0]

        # fill dte
        kte = [
            kk for kk, vv in dte.items()
            if v0['te'].shape == vv['data'].shape
            and np.allclose(v0['te'], vv['data'])
            and sour == vv['source']
        ]
        normal = dref0 is None
        if normal is False:
            # Check vs existing Te
            lk0 = [
                k1 for k1, v1 in dref0.items()
                if ddata0[k1]['source'] == sour
                and v0['te'].shape == ddata0[k1]['data'].shape
                and np.allclose(v0['te'], ddata0[k1]['data'])
            ]
            if len(lk0) == 0:
                normal = True
            elif len(lk0) == 1:
                keyte = lk0[0]
                dte[keyte] = {
                    'data': ddata0[lk0[0]]['data'],
                    'units': v0['te_units'],
                    'source': sour,
                    'dim': 'temperature',
                    'quant': 'Te',
                    'name': 'Te',
                    'group': 'Te',
                }
            elif len(lk0) > 1:
                msg = (
                    "Multiple matches for dout[{}] in dref0:\n".format(k0)
                    + "\t- {}".format(lk0)
                )
                raise Exception(msg)

        if normal is True:
            if len(kte) == 0:
                keyte = 'Te-{:02}'.format(ite)
                dte[keyte] = {
                    'data': v0['te'],
                    'units': v0['te_units'],
                    'source': sour,
                    'dim': 'temperature',
                    'quant': 'Te',
                    'name': 'Te',
                    'group': 'Te',
                }
                ite += 1
            elif len(kte) == 1:
                pass
            else:
                msg = (
                    "len(kte) != 1:\n"
                    + "\t- kte = {}\n".format(kte)
                )
                raise Exception(msg)
        dout[k0]['keyte'] = keyte

        # fill dne
        kne = [
            kk for kk, vv in dne.items()
            if v0['ne'].shape == vv['data'].shape
            and np.allclose(v0['ne'], vv['data'])
            and sour == vv['source']
        ]
        normal = dref0 is None
        if normal is False:
            # Check vs existing ne
            lk0 = [
                k1 for k1, v1 in dref0.items()
                if ddata0[k1]['source'] == sour
                and v0['ne'].shape == ddata0[k1]['data'].shape
                and np.allclose(v0['ne'], ddata0[k1]['data'])
            ]
            if len(lk0) == 0:
                normal = True
            elif len(lk0) == 1:
                keyne = lk0[0]
                dne[keyne] = {
                    'data': ddata0[lk0[0]]['data'],
                    'units': v0['ne_units'],
                    'source': sour,
                    'dim': 'density',
                    'quant': 'ne',
                    'name': 'ne',
                    'group': 'ne',
                }
            elif len(lk0) > 1:
                msg = (
                    "Multiple matches for dout[{}] in dref0:\n".format(k0)
                    + "\t- {}".format(lk0)
                )
                raise Exception(msg)

        if normal is True:
            if len(kne) == 0:
                keyne = 'ne-{:02}'.format(ine)
                dne[keyne] = {
                    'data': v0['ne'],
                    'units': v0['ne_units'],
                    'source': sour,
                    'dim': 'density',
                    'quant': 'ne',
                    'name': 'ne',
                    'group': 'ne',
                }
                ine += 1
            elif len(kne) == 1:
                pass
            else:
                msg = (
                    "len(kne) != 1:\n"
                    + "\t- kne = {}\n".format(kne)
                )
                raise Exception(msg)
        dout[k0]['keyne'] = keyne

    # Get dict of pec
    dpec = {
        '{}-pec'.format(k0): {
            'data': v0['pec'], 'units': v0['pec_units'],
            'ref': (v0['keyne'], v0['keyte']),
            'source': [
                k1 for k1, v1 in dsource.items()
                if v1['long'] == v0['source']
            ][0],
            'dim': '<sigma v>',
            'quant': 'pec',
        }
        for k0, v0 in dout.items()
    }

    # dlines
    inds = np.argsort([v0['lambda0'] for v0 in dout.values()])
    lk0 = np.array(list(dout.keys()), dtype=str)[inds]
    dlines = {
        k0: {
            'ion': dout[k0]['ion'],
            'source': [
                k1 for k1, v1 in dsource.items()
                if v1['long'] == dout[k0]['source']
            ][0],
            'lambda0': dout[k0]['lambda0'],
            'pec': '{}-pec'.format(k0),
            'symbol': dout[k0]['symbol'],
            'type': dout[k0]['type'],
            'transition': dout[k0]['transition'],
        }
        for k0 in lk0
    }

    # Get dict of unique ions
    lion = sorted(set([v0['ion'] for v0 in dout.values()]))

    return dne, dte, dpec, lion, dsource, dlines


# #############################################################################
# #############################################################################
#                       Main functions
# #############################################################################


[docs]def step03_read( adas_path, pec_as_func=None, **kwdargs, ): """ Read openadas-formatted files and return a dict with the data Povide the full adas file name The result is returned as a dict example ------- >>> import tofu as tf >>> fn = '/adf11/scd74/scd74_ar.dat' >>> out = tf.openadas2tofu.step03_read(fn) >>> fn = '/adf15/pec40][ar/pec40][ar_ca][ar16.dat' >>> out = tf.openadas2tofu.step03_read(fn) """ path_local = _get_PATH_LOCAL() # Check whether the local .tofu repo exists, if not recommend tofu-custom if path_local is None: path = os.path.join(os.path.expanduser('~'), '.tofu', 'openadas2tofu') msg = ("You do not seem to have a local ./tofu repository\n" + "tofu uses that local repository to store all user-specific " + "data and downloads\n" + "In particular, openadas files are downloaded and saved in:\n" + "\t{}\n".format(path) + " => to set-up your local .tofu repo, run in a terminal:\n" + "\ttofu-custom") raise Exception(msg) # Determine whether adas_path is an absolute path or an adas full name if os.path.isfile(adas_path): pfe = adas_path else: # make sure adas_path is not understood as absolute local path if adas_path[0] == '/': adas_path = adas_path[1:] # Check file was downloaded locally pfe = os.path.join(path_local, adas_path) if not os.path.isfile(pfe): msg = ("Provided file does not seem to exist:\n" + "\t{}\n".format(pfe) + " => Search it online with " + "tofu.openadas2tofu.step01_search_online()\n" + " => Download it with " + "tofu.openadas2tofu.step02_download()") raise FileNotFoundError(msg) lc = [ss for ss in _DTYPES.keys() if ss in pfe] if not len(lc) == 1: msg = ("File type could not be derived from absolute path:\n" + "\t- provided: {}\n".format(pfe) + "\t- supported: {}".format(sorted(_DTYPES.keys()))) raise Exception(msg) func = eval('_read_{}'.format(lc[0])) return func(pfe, **kwdargs)
[docs]def step03_read_all( element=None, charge=None, typ1=None, typ2=None, pec_as_func=None, format_for_DataCollection=None, dsource0=None, dref0=None, ddata0=None, dlines0=None, verb=None, **kwdargs, ): """ Read all relevant openadas files for chosen typ1 Please specify: - typ1: - 'adf11': ionisation / recombination data - 'adf15': pec data - element: the symbol of the element If typ1 = 'adf11', you can also provide typ2 to specify the coefficients: - 'scd': effective ionisation coefficients - 'acd': effective electron-impact recombination coefficients - 'ccd': effective hydrogen-impact recombination coefficients - 'plt': line power due to electron-impact excitation - 'prc': line power due to hydrogen-impact excitation - 'prb': rad. recombination and bremmstrahlung due to electron-impact If typ1 = 'adf15', you can optioanlly provide a min/max wavelength The result is returned as a dict examples -------- >>> import tofu as tf >>> dout = tf.openadas2tofu.step03_read_all(element='ar', typ1='adf11') >>> dout = tf.openadas2tofu.step03_read_all(element='ar', typ1='adf15', charge=16, lambmin=3.94e-10, lambmax=4.e-10) """ # -------------------- # Check whether the local .tofu repo exists, if not recommend tofu-custom path_local = _get_PATH_LOCAL() if path_local is None: path = os.path.join(os.path.expanduser('~'), '.tofu', 'openadas2tofu') msg = ("You do not seem to have a local ./tofu repository\n" + "tofu uses that local repository to store all user-specific " + "data and downloads\n" + "In particular, openadas files are downloaded and saved in:\n" + "\t{}\n".format(path) + " => to set-up your local .tofu repo, run in a terminal:\n" + "\ttofu-custom") raise Exception(msg) # -------------------- # Check / format input if typ1 is None: typ1 = 'adf15' if not isinstance(typ1, str) or typ1.lower() not in _DTYPES.keys(): msg = ("Please choose a valid adas file type:\n" + "\t- allowed: {}\n".format(_DTYPES.keys()) + "\t- provided: {}".format(typ1)) raise Exception(msg) typ1 = typ1.lower() if typ1 == 'adf11' and typ2 is None: typ2 = _DTYPES[typ1] fd = os.listdir(os.path.join(path_local, typ1)) typ2 = [sorted([ss for ss in fd if tt in ss])[-1] for tt in typ2] if isinstance(typ2, str): typ2 = [typ2] if (_DTYPES[typ1] is not None and (not isinstance(typ2, list) or not all([any([s1 in ss for s1 in _DTYPES[typ1]]) for ss in typ2]))): msg = ("typ2 must be a list of valid openadas file types for typ1:\n" + "\t- provided: {}\n".format(typ2) + "\t- available for {}: {}".format(typ1, _DTYPES[typ1])) raise Exception(msg) # -------------------- # Get elevant directory # Level 1: Type path = _get_subdir_from_pattern(path_local, typ1, mult='err')[0] # -------------------- # element c0 = ( element is None or isinstance(element, str) or ( isinstance(element, list) and all([isinstance(ee, str) for ee in element]) ) or ( isinstance(element, tuple) and all([isinstance(ee, str) for ee in element]) ) ) if not c0: msg = "Please choose an element!" raise Exception(msg) if element is None or isinstance(element, tuple): el = _get_available_elements_from_path(path=path, typ1=typ1) if element is None: element = el else: element = tuple([ee.lower() for ee in element]) element = [ee for ee in el if ee not in element] if isinstance(element, str): element = [element] element = [ee.lower() for ee in element] # -------------------- # charge if charge is not None: c0 = ( isinstance(charge, int) or ( isinstance(charge, list) and all([isinstance(cc, int) for cc in charge]) ) or ( isinstance(charge, tuple) and all([isinstance(cc, int) for cc in charge]) ) ) if not c0: msg = ("Arg charge must be a int or list (e.g.: 16 or [0])\n" + "\t- provided: {}".format(charge)) raise Exception(msg) if isinstance(charge, int): charge = [charge] if isinstance(charge, list): charge = ['{}.dat'.format(cc) for cc in charge] elif isinstance(charge, tuple): charge = tuple(['{}.dat'.format(cc) for cc in charge]) if format_for_DataCollection is None: format_for_DataCollection = False if verb is None: verb = True # -------------------- # Get list of relevant directories per element # Level 2: element or typ2 if typ1 == 'adf11': lpath = [ _get_subdir_from_pattern(path, tt, mult='err')[0] for tt in typ2 ] elif typ1 == 'adf15': lpath = np.concatenate([ _get_subdir_from_pattern( path, ['pec', '][{}'.format(ee)], mult=True, ) for ee in element ]).tolist() # -------------------- # Get list of relevant files pfe lpfe = list(itt.chain.from_iterable([[ os.path.join(path, ff) for ff in os.listdir(path) if ( os.path.isfile(os.path.join(path, ff)) and ff[-4:] == '.dat' and any(['][{}'.format(ee) in ff for ee in element]) )] for path in lpath ])) if typ1 == 'adf15': kwdargs['pec_as_func'] = pec_as_func if isinstance(charge, list): lpfe = [ ff for ff in lpfe if any([ ''.join([ss for ss in ff.split('][')[-1] if ss.isdigit()]) + '.dat' == cc for cc in charge ]) ] elif isinstance(charge, tuple): lpfe = [ ff for ff in lpfe if not any([ ''.join([ss for ss in ff.split('][')[-1] if ss.isdigit()]) + '.dat' == cc for cc in charge ]) ] # -------------------- # Extract data from each file func = eval('_read_{}'.format(typ1)) dout = {} for pfe in lpfe: if verb is True: msg = "\tLoading data from {}".format(pfe) print(msg) dout = func(pfe, dout=dout, **kwdargs) if typ1 == 'adf15' and format_for_DataCollection is True: return _format_for_DataCollection_adf15( dout, dsource0=dsource0, dref0=dref0, ddata0=ddata0, dlines0=dlines0, ) else: return dout
# ############################################################################# # Specialized functions for ADF 11 # ############################################################################# def _read_adf11(pfe, deg=None, dout=None): if deg is None: deg = _DEG if dout is None: dout = {} # Get second order file type typ1 = [vv for vv in _DTYPES['adf11'] if vv in pfe] if len(typ1) != 1: msg = ("Second order file type could not be inferred from file name!\n" + "\t- available: {}\n".format(_DTYPES['adf11']) + "\t- provided: {}".format(pfe)) raise Exception(msg) typ1 = typ1[0] # Get element elem = pfe[:-4].split('_')[1] comline = '-'*60 comline2 = 'C'+comline if typ1 in ['acd', 'ccd', 'scd', 'plt', 'prb']: # read blocks with open(pfe) as search: for ii, line in enumerate(search): if comline2 in line: break # Get atomic number (transitions) stored in this file if ii == 0: lstr = line.split('/') lin = [ss for ss in lstr[0].strip().split(' ') if ss.strip() != ''] lc = [ len(lin) == 5 and all([ss.isdigit() for ss in lin]), elem.upper() in lstr[1], # 'ADF11' in lstr[-1], ] if not all(lc): msg = ("File header format seems to have changed!\n" + "\t- pfe: {}\n".format(pfe) + "\t- lc = {}\n".format(lc) + "\t- lstr = {}".format(lstr)) raise Exception(msg) Z, nne, nte, q0, qend = map(int, lin) nelog10 = np.array([]) telog10 = np.array([]) in_ne = True continue if comline in line: continue # Get nelog10 if in_ne: li = [ss for ss in line.strip().split(' ') if ss.strip() != ''] nelog10 = np.append(nelog10, np.array(li, dtype=float)) if nelog10.size == nne: in_ne = False in_te = True # Get telog10 elif in_te is True: li = [ss for ss in line.strip().split(' ') if ss.strip() != ''] telog10 = np.append(telog10, np.array(li, dtype=float)) if telog10.size == nte: in_te = False in_ion = True # Get ion block elif (in_ion is True and 'Z1=' in line and ('------/' in line and 'DATE=' in line)): nion = int( line[line.index('Z1=')+len('Z1='):].split('/')[0]) if typ1 in ['scd', 'plt']: charge = nion - 1 else: charge = nion coefslog10 = np.array([]) elif in_ion is True and charge is not None: li = [ss for ss in line.strip().split(' ') if ss.strip() != ''] coefslog10 = np.append(coefslog10, np.array(li, dtype=float)) if coefslog10.size == nne*nte: key = '{}{}'.format(elem, charge) tkv = [('element', elem), ('Z', Z), ('charge', charge)] if key in dout.keys(): assert all([dout[key][ss] == vv for ss, vv in tkv]) else: dout[key] = {ss: vv for ss, vv in tkv} if typ1 == 'scd': # nelog10+6 to convert /cm3 -> /m3 # coefslog10-6 to convert cm3/s -> m3/s func = scpRectSpl(nelog10+6, telog10, coefslog10.reshape((nne, nte))-6, kx=deg, ky=deg) dout[key]['ionis'] = {'func': func, 'type': 'log10_nete', 'units': 'log10(m3/s)', 'source': pfe} elif typ1 == 'acd': # nelog10+6 to convert /cm3 -> /m3 # coefslog10-6 to convert cm3/s -> m3/s func = scpRectSpl(nelog10+6, telog10, coefslog10.reshape((nne, nte))-6, kx=deg, ky=deg) dout[key]['recomb'] = {'func': func, 'type': 'log10_nete', 'units': 'log10(m3/s)', 'source': pfe} elif typ1 == 'ccd': # nelog10+6 to convert /cm3 -> /m3 # coefslog10-6 to convert cm3/s -> m3/s func = scpRectSpl(nelog10+6, telog10, coefslog10.reshape((nne, nte))-6, kx=deg, ky=deg) dout[key]['recomb_ce'] = {'func': func, 'type': 'log10_nete', 'units': 'log10(m3/s)', 'source': pfe} elif typ1 == 'plt': # nelog10+6 to convert /cm3 -> /m3 # coefslog10+6 to convert W.cm3 -> W.m3 func = scpRectSpl(nelog10+6, telog10, coefslog10.reshape((nne, nte))+6, kx=deg, ky=deg) dout[key]['rad_bb'] = {'func': func, 'type': 'log10_nete', 'units': 'log10(W.m3)', 'source': pfe} elif typ1 == 'prb': # nelog10+6 to convert /cm3 -> /m3 # coefslog10+6 to convert W.cm3 -> W.m3 func = scpRectSpl(nelog10+6, telog10, coefslog10.reshape((nne, nte))+6, kx=deg, ky=deg) dout[key]['rad_fffb'] = {'func': func, 'type': 'log10_nete', 'units': 'log10(W.m3)', 'source': pfe} if nion == Z: break return dout # ############################################################################# # Specialized functions for ADF 15 # ############################################################################# def _get_adf15_key(elem, charge, isoel, typ0, typ1): return '{}{}_{}_oa_{}_{}'.format(elem, charge, isoel, typ0, typ1) def _read_adf15( pfe, dout=None, lambmin=None, lambmax=None, pec_as_func=None, deg=None, ): """ Here lambmin and lambmax are provided in m """ if deg is None: deg = _DEG if pec_as_func is None: pec_as_func = _PECASFUNC if dout is None: dout = {} # Get summary of transitions flagblock = '/isel =' flag0 = 'superstage partition information' # Get file markers from name (elem, charge, typ0, typ1) typ0, typ1, elemq = pfe.split('][')[1:] ind = re.search(r'\d', elemq).start() elem = elemq[:ind].title() charge = int(elemq[ind:-4]) assert elem.lower() in typ0[:2] assert elem.lower() == typ1.split('_')[0] typ0 = typ0[len(elem)+1:] typ1 = typ1.split('_')[1] # Extract data from file nlines, nblock = None, 0 in_ne, in_te, in_pec, in_tab, itab = False, False, False, False, np.inf skip = False with open(pfe) as search: for ii, line in enumerate(search): # Get number of lines (transitions) stored in this file if ii == 0: lstr = line.split('/') nlines = int(lstr[0].replace(' ', '')) continue # Get info about the transition being scanned (block) if flagblock in line and 'C' not in line and nblock < nlines: lstr = [kk for kk in line.rstrip().split(' ') if len(kk) > 0] lamb = float(lstr[0])*1.e-10 isoel = nblock + 1 nblock += 1 c0 = ((lambmin is not None and lamb < lambmin) or (lambmax is not None and lamb > lambmax)) if c0: skip = True continue skip = False nne, nte = int(lstr[1]), int(lstr[2]) typ = [ss[ss.index('type=')+len('type='):ss.index('/ispb')] for ss in lstr[3:] if 'type=' in ss] assert len(typ) == 1 # To be updated : proper reading from line in_ne = True ne = np.array([]) te = np.array([]) pec = np.full((nne*nte,), np.nan) ind = 0 # il = 0 continue if 'root partition information' in line and skip is True: skip = False # Check lamb is ok if skip is True: continue # Get ne for the transition being scanned (block) if in_ne is True: ne = np.append(ne, np.array(line.rstrip().strip().split(' '), dtype=float)) if ne.size == nne: in_ne = False in_te = True # Get te for the transition being scanned (block) elif in_te is True: te = np.append(te, np.array(line.rstrip().strip().split(' '), dtype=float)) if te.size == nte: in_te = False in_pec = True # Get pec for the transition being scanned (block) elif in_pec is True: data = np.array(line.rstrip().strip().split(' '), dtype=float) pec[ind:ind+data.size] = data # pec[il, :] = data ind += data.size # il += 1 if ind == pec.size: in_pec = False key = _get_adf15_key(elem, charge, isoel, typ0, typ1) # PEC reshaping and conversion to cm3/s -> m3/s pec = pec.reshape((nne, nte)) * 1e-6 # log(ne)+6 to convert /cm3 -> /m3 ne = ne*1e6 if pec_as_func is True: pec_rec = scpRectSpl( np.log(ne), np.log(te), np.log(pec), kx=deg, ky=deg, ) def pec(Te=None, ne=None, pec_rec=pec_rec): return np.exp(pec_rec(np.log(ne), np.log(Te))) dout[key] = { 'lambda0': lamb, 'ion': '{}{}+'.format(elem, charge), 'charge': charge, 'element': elem, 'symbol': '{}{}-{}'.format(typ0, typ1, isoel), 'source': pfe, 'type': typ[0], 'ne': ne, 'ne_units': '/m3', 'te': te, 'te_units': 'eV', 'pec': pec, 'pec_type': 'f(ne, Te)', 'pec_units': 'm3/s', } # Get transitions from table at the end if 'photon emissivity atomic transitions' in line: itab = ii + 6 if ii == itab: in_tab = True if in_tab is True: lstr = [kk for kk in line.rstrip().split(' ') if len(kk) > 0] isoel = int(lstr[1]) lamb = float(lstr[2])*1.e-10 key = _get_adf15_key(elem, charge, isoel, typ0, typ1) c0 = ((lambmin is None or lambmin < lamb) and (lambmax is None or lambmax > lamb)) if c0 and key not in dout.keys(): msg = ("Inconsistency in file {}:\n".format(pfe) + "\t- line should be present".format(key)) raise Exception(msg) if key in dout.keys(): if dout[key]['lambda0'] != lamb: msg = "Inconsistency in file {}".format(pfe) raise Exception(msg) c0 = (dout[key]['type'] not in lstr or lstr.index(dout[key]['type']) < 4) if c0: msg = ("Inconsistency in table, type not found:\n" + "\t- expected: {}\n".format(dout[key]['type']) + "\t- line: {}".format(line)) raise Exception(msg) trans = lstr[3:lstr.index(dout[key]['type'])] dout[key]['transition'] = ''.join(trans) if isoel == nlines: in_tab = False assert all(['transition' in vv.keys() for vv in dout.values()]) return dout