Source code for tofu.data._comp

# -*- coding: utf-8 -*-

# Builtin
import warnings

# Common
import numpy as np
import scipy.signal as scpsig
import scipy.interpolate as scpinterp
import scipy.linalg as scplin
import scipy.stats as scpstats
from matplotlib.tri import LinearTriInterpolator as mplTriLinInterp


from . import _DataCollection_comp


_fmin_coef = 5.
_ANITYPE = 'sca'
_FILLVALUE = np.nan


#############################################
#############################################
#############################################
#       Spectrograms
#############################################
#############################################


[docs]def spectrogram(data, t, fmin=None, method='scipy-fourier', deg=False, window='hann', detrend='linear', nperseg=None, noverlap=None, boundary='constant', padded=True, wave='morlet', warn=True): # Format/check inputs lm = ['scipy-fourier', 'scipy-stft']#, 'scipy-wavelet'] if not method in lm: msg = "Alowed methods are:" msg += "\n - scipy-fourier: scipy.signal.spectrogram()" msg += "\n - scipy-stft: scipy.signal.stft()" #msg += "\n - scpipy-wavelet: scipy.signal.cwt()" raise Exception(msg) nt, nch = data.shape if t is not None: assert t.shape==(nt,) else: t = np.arange(0,nt) if not np.allclose(t,t[0]+np.mean(np.diff(t))*np.arange(0,nt)): msg = "Time vector does not seem to be regularly increasing !" raise Exception(msg) dt = np.mean(np.diff(t)) fs = 1./dt # Compute if method in ['scipy-fourier', 'scipy-stft']: stft = 'stft' in method f, tf, lpsd, lang = _spectrogram_scipy_fourier( data, fs, nt, nch, fmin=fmin, stft=stft, deg=deg, window=window, nperseg=nperseg, noverlap=noverlap, detrend=detrend, boundary=boundary, padded=padded, warn=warn, ) tf = tf + t[0] elif method=='scipy-wavelet': f, lspect = _spectrogram_scipy_wavelet(data, fs, nt, nch, fmin=fmin, wave=wave, warn=warn) tf = t.copy() return tf, f, lpsd, lang
[docs]def _spectrogram_scipy_fourier(data, fs, nt, nch, fmin=None, window=('tukey', 0.25), deg=False, nperseg=None, noverlap=None, detrend='linear', stft=False, boundary='constant', padded=True, warn=True): """ Return a spectrogram for each channel, and a common frequency vector The min frequency of interest fmin fixes the nb. of pt. per seg. (if None) The number of overlapping points is set to nperseg-1 if None The choice of the window type is a trade-off between: Spectral resolution between similar frequencies/amplitudes: => Dynamic range (lots of != frequencies of != amplitudes): => Compromise: => 'hann' """ # Check inputs if nperseg is None and fmin is None: fmin = _fmin_coef*(fs/nt) if warn: msg = "nperseg and fmin were not provided\n" msg += " => fmin automatically set to 10.*fs/nt:\n" msg += " fmin = 10.*{0} / {1} = {2} Hz".format(fs,nt,fmin) warnings.warn(msg) # Format inputs if nperseg is None: assert fmin > fs/nt nperseg = int(np.ceil(fs/fmin)) if nperseg%2==1: nperseg = nperseg + 1 if noverlap is None: noverlap = nperseg - 1 n = int(np.ceil(np.log(nperseg)/np.log(2))) nfft = 2**n # Prepare output if stft: f, tf, ssx = scpsig.stft(data, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, boundary=boundary, padded=padded, axis=0) else: f, tf, ssx = scpsig.spectrogram(data, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density', axis=0, mode='complex') # Split in list (per channel) lssx = np.split(ssx, np.arange(1,nch), axis=1) lssx = [ss.squeeze().T for ss in lssx] lpsd = [np.abs(ss)**2 for ss in lssx] lang = [np.angle(ss, deg=deg) for ss in lssx] return f, tf, lpsd, lang
[docs]def _spectrogram_scipy_wavelet(data, fs, nt, nch, fmin=None, wave='morlet', warn=True): if wave!='morlet': msg = "Only the morlet wavelet implmented so far !" raise Exception(msg) # Check inputs if fmin is None: fmin = _fmin_coef*(fs/nt) if warn: msg = "fmin was not provided => set to 10.*fs/nt" warnings.warn(msg) nw = int((1./fmin-2./fs)*fs) widths = 2.*np.pi*np.linspace(fmin,fs/2.,nw) wave = eval('scpsig.%s'%wave) lcwt = [] for ii in range(0,nch): cwt = scpsig.cwt(data[:,ii], wave, widths) lcwt.append(np.abs(cwt)**2) f = widths/(2.*np.pi) return f, lcwt
############################################# ############################################# ############################################# # SVD ############################################# #############################################
[docs]def calc_svd(data, lapack_driver='gesdd'): chronos, s, topos = scplin.svd(data, full_matrices=True, compute_uv=True, overwrite_a=False, check_finite=True, lapack_driver=lapack_driver) # Test if reversed correlation lind = [np.nanargmax(np.std(data,axis=0)), np.nanargmax(np.mean(data,axis=0)), 0, data.shape[1]//2, -1] corr = np.zeros((len(lind),2)) for ii in range(0,len(lind)): corr[ii,:] = scpstats.pearsonr(chronos[:,0], data[:,lind[ii]]) ind = corr[:,1] < 0.05 if np.any(ind): corr = corr[ind,0] if corr[np.argmax(np.abs(corr))] < 0.: chronos, topos = -chronos, -topos return chronos, s, topos
############################################# ############################################# ############################################# # Filtering ############################################# #############################################
[docs]def filter_bandpass_fourier(t, data, method='stft', detrend='linear', df=None, harm=True, df_out=None, harm_out=True): """ Return bandpass FFT-filtered signal (and the rest) Optionnally include all higher harmonics Can also exclude a frequency interval and its higher harmonics Parameters ---------- t : np.ndarray 1D array, monotonously increasing time vector with regular spacing data : np.ndarray 1 or 2D array, with shape[0]=t.size, the data to be filtered method: str Flag indicating which method to use: - 'rfft': scipy.fftpack.rfft - 'stft': scipy.signal.stft df : None / list List or tuple of len()=2, containing the bandpass lower / upper bounds harm : bool If True all the higher harmonics of df will also be included df_out : None / list List or tuple of len()=2, containing the bandpass lower / upper bounds to be excluded from filtering (if overlaps with high harmonics of df) harm_out : bool If True, the higher harmonics of the interval df_out are also excluded Test : bool If True tests all the input arguments for any mistake Returns ------- In : np.ndarray Array with shape=data.shape, filtered signal retrieved from inverse FFT Out : np.ndarray Array with shape=data.shape, excluded signal from filtering """ # Check / format inputs assert data.ndim==2 and t.ndim==1 assert data.shape[0]==(t.size,) assert np.allclose(t,np.unique(t)) and np.std(np.diff(t))<=1.e-12 assert method in ['rfft','stft'] lC = [df is None, df_out is None] assert np.sum(lC)<=1, "At least one of df or df_out must be provided !" assert type(harm) is bool and type(harm_out) is bool if df is not None: df = np.unique(df) assert df.shape==(2,) if df_out is not None: df_out = np.unique(df_out) assert df_out.shape==(2,) nt, nch = data.shape dt = np.mean(np.diff(t)) fs = 1./dt if method == 'rfft': data_in, data_out = _filter_bandpass_rfft(data, t, dt, fs, nt, nch, df=df, harm=harm, df_out=df_out, harm_out=harm_out) elif method == 'stft': data_in, data_out = _filter_bandpass_stft(data, t, dt, fs, nt, nch, df=df, harm=harm, df_out=df_out, harm_out=harm_out)
[docs]def _filter_bandpass_rfft(data, t, dt, fs, nt, nch, df): # Get frequency f = np.fft.fftfreq(nt, dt) f = f[f>=0] if nt%2==0: f = np.append(f,dt*nt/2) nf = f.size # Get rfft A = np.fft.rfft(data, axis=0) assert A.shape[0]==nf, "Problem with frequency scale !" # Intervals of interest for reconstruction if df is None: indin = np.ones((nf,),dtype=bool) else: indin = (f>=df[0]) & (f<=df[1]) if harm: for ii in range(1,int(np.floor(f.max()/df[0]))): indin = indin | ((f>=df[0]*ii) & (f<=df[1]*ii)) if df_out is not None: indin = indin & ~((f>=df_out[0]) & (f<=df_out[1])) if harm_out: for ii in range(1,int(np.floor(f.max()/df_out[0]))): indin = indin & ~((f>=df_out[0]*ii) & (f<=df_out[1]*ii)) # Reconstructing Aphys = np.copy(A) Aphys[~indin,:] = 0 A[indin,:] = 0 return np.fft.irfft(Aphys, nt, axis=0), np.fft.irfft(A, nt, axis=0)
[docs]def _filter_bandpass_stft(data, t, dt, fs, nt, nch, df): # Get stft f, tf, ssx = scpsig.stft(data, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=False, return_onesided=True, boundary=boundary, padded=padded, axis=0) ssx = np.abs(ssx)**2 nf = f.size # Intervals of interest for reconstruction if df is None: indin = np.ones((nf,),dtype=bool) else: indin = (f>=df[0]) & (f<=df[1]) if harm: for ii in range(1,int(np.floor(f.max()/df[0]))): indin = indin | ((f>=df[0]*ii) & (f<=df[1]*ii)) if df_out is not None: indin = indin & ~((f>=df_out[0]) & (f<=df_out[1])) if harm_out: for ii in range(1,int(np.floor(f.max()/df_out[0]))): indin = indin & ~((f>=df_out[0]*ii) & (f<=df_out[1]*ii)) # Reconstructing ssxphys = np.copy(ssx) ssxphys[~indin,:] = 0 ssx[indin,:] = 0 data_in = scpsig.istft(ssxphys, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, input_onesided=True, boundary=boundary, time_axis=0, freq_axis=0) data_out = scpsig.istft(ssx, fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, input_onesided=True, boundary=boundary, time_axis=0, freq_axis=0) return data_in, data_out
[docs]def filter_svd(data, lapack_driver='gesdd', modes=[]): """ Return the svd-filtered signal using only the selected mode Provide the indices of the modes desired """ # Check input modes = np.asarray(modes,dtype=int) assert modes.ndim==1 assert modes.size>=1, "No modes selected !" u, s, v = scplin.svd(data, full_matrices=False, compute_uv=True, overwrite_a=False, check_finite=True, lapack_driver=lapack_driver) indout = np.arange(0,s.size) indout = np.delete(indout, modes) data_in = np.dot(u[:,modes]*s[modes],v[modes,:]) data_out = np.dot(u[:,indout]*s[indout],v[indout,:]) return data_in, data_out
############################################# ############################################# ############################################# # Interpolating ############################################# #############################################
[docs]def get_finterp_isotropic( idquant, idref1d, idref2d, vquant=None, vr1=None, vr2=None, interp_t=None, interp_space=None, idmesh=None, fill_value=None, tall=None, tbinall=None, ntall=None, indtq=None, indtr1=None, indtr2=None, mpltri=None, trifind=None, ): if fill_value is None: fill_value = _FILLVALUE if interp_t is None: interp_t = 'nearest' # ----------------------------------- # Interpolate directly on 2d quantity # ----------------------------------- if idref1d is None: # -------------------- # Linear interpolation if interp_space == 1: def func(pts, vect=None, t=None, ntall=ntall, mplTriLinInterp=mplTriLinInterp, mpltri=mpltri, trifind=trifind, vquant=vquant, indtq=indtq, tall=tall, tbinall=tbinall, idref1d=idref1d, idref2d=idref2d): r, z = np.hypot(pts[0, :], pts[1, :]), pts[2, :] shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size val = np.full(tuple(shapeval), fill_value) ntall, indt, indtu, indtq = _DataCollection_comp._get_indtu( t=t, tall=tall, tbinall=tbinall, indtq=indtq, )[1:-2] for ii in range(0, ntall): ind = indt == indtu[ii] val[ind, ...] = mplTriLinInterp( mpltri, vquant[indtq[indtu[ii]], :], trifinder=trifind, )(r, z).filled(fill_value) if np.any(np.isnan(val)) and not np.isnan(fill_value): val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t # -------------------- # Degree 0 interpolation else: def func(pts, vect=None, t=None, ntall=ntall, trifind=trifind, vquant=vquant, indtq=indtq, tall=tall, tbinall=tbinall, idref1d=idref1d, idref2d=idref2d): r, z = np.hypot(pts[0,:],pts[1,:]), pts[2,:] shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size val = np.full(tuple(shapeval), fill_value) indpts = trifind(r,z) if isinstance(indpts, tuple): indr, indz = indpts indok = (indr > -1) & (indz > -1) indr = indr[indok] indz = indz[indok] else: indok = indpts > -1 indpts = indpts[indok] ntall, indt, indtu, indtq = _DataCollection_comp._get_indtu( t=t, tall=tall, tbinall=tbinall, indtq=indtq, )[1:-2] if isinstance(indpts, tuple): for ii in range(0, ntall): ind = indt == indtu[ii] val[ind, indok] = vquant[indtq[indtu[ii]], indr, indz] else: for ii in range(0, ntall): ind = indt == indtu[ii] val[ind, indok] = vquant[indtq[indtu[ii]], indpts] if np.any(np.isnan(val)) and not np.isnan(fill_value): val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t else: if interp_space == 1: def func(pts, vect=None, t=None, ntall=ntall, mplTriLinInterp=mplTriLinInterp, mpltri=mpltri, trifind=trifind, vquant=vquant, indtq=indtq, interp_space=interp_space, fill_value=fill_value, vr1=vr1, indtr1=indtr1, vr2=vr2, indtr2=indtr2, tall=tall, tbinall=tbinall, idref1d=idref1d, idref2d=idref2d): r, z = np.hypot(pts[0,:],pts[1,:]), pts[2,:] shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size val = np.full(tuple(shapeval), fill_value) t0tri, t0int = 0., 0. ntall, indt, indtu, indtq, indtr1, indtr2 = \ _DataCollection_comp._get_indtu( t=t, tall=tall, tbinall=tbinall, indtq=indtq, indtr1=indtr1, indtr2=indtr2, )[1:] for ii in range(0, ntall): # get ref values for mapping vii = mplTriLinInterp( mpltri, vr2[indtr2[indtu[ii]], :], trifinder=trifind, )(r, z) # interpolate 1d ind = indt == indtu[ii] val[ind, ...] = scpinterp.interp1d( vr1[indtr1[indtu[ii]], :], vquant[indtq[indtu[ii]], :], kind='linear', bounds_error=False, fill_value=fill_value, )(np.asarray(vii)) val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t else: def func(pts, vect=None, t=None, ntall=ntall, trifind=trifind, vquant=vquant, indtq=indtq, interp_space=interp_space, fill_value=fill_value, vr1=vr1, indtr1=indtr1, vr2=vr2, indtr2=indtr2, tall=tall, tbinall=tbinall, idref1d=idref1d, idref2d=idref2d): r, z = np.hypot(pts[0,:],pts[1,:]), pts[2,:] shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size val = np.full(tuple(shapeval), fill_value) indpts = trifind(r,z) if isinstance(indpts, tuple): indr, indz = indpts indok = (indr > -1) & (indz > -1) indr = indr[indok] indz = indz[indok] else: indok = indpts > -1 indpts = indpts[indok] ntall, indt, indtu, indtq, indtr1, indtr2 = \ _DataCollection_comp._get_indtu( t=t, tall=tall, tbinall=tbinall, indtq=indtq, indtr1=indtr1, indtr2=indtr2, )[1:] if isinstance(indpts, tuple): for ii in range(0, ntall): # interpolate 1d ind = indt == indtu[ii] val[ind, indok] = scpinterp.interp1d( vr1[indtr1[indtu[ii]], :], vquant[indtq[indtu[ii]], :], kind='linear', bounds_error=False, fill_value=fill_value, )(vr2[indtr2[indtu[ii]], indr, indz]) else: for ii in range(0, ntall): # interpolate 1d ind = indt == indtu[ii] val[ind, indok] = scpinterp.interp1d( vr1[indtr1[indtu[ii]], :], vquant[indtq[indtu[ii]], :], kind='linear', bounds_error=False, fill_value=fill_value, )(vr2[indtr2[indtu[ii]], indpts]) # Double check nan in case vr2 is itself nan on parts of mesh if np.any(np.isnan(val)) and not np.isnan(fill_value): val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t return func
[docs]def get_finterp_ani( idq2dR, idq2dPhi, idq2dZ, interp_t=None, interp_space=None, fill_value=None, mpltri=None, idmesh=None, vq2dR=None, vq2dPhi=None, vq2dZ=None, tall=None, tbinall=None, ntall=None, indtq=None, trifind=None, Type=None, ): # ----------------------------------- # Interpolate directly on 2d quantity # ----------------------------------- if Type is None: Type = _ANITYPE if fill_value is None: fill_value = _FILLVALUE if interp_t is None: interp_t = 'nearest' # -------------------- # Linear interpolation if interp_space == 1: def func(pts, vect=None, t=None, ntall=ntall, mplTriLinInterp=mplTriLinInterp, mpltri=mpltri, trifind=trifind, vq2dR=vq2dR, vq2dPhi=vq2dPhi, vq2dZ=vq2dZ, indtq=indtq, tall=tall, tbinall=tbinall): # Get pts in (r,z,phi) r, z = np.hypot(pts[0, :], pts[1, :]), pts[2, :] phi = np.arctan2(pts[1, :], pts[0, :]) # Deduce vect in (r,z,phi) vR = np.cos(phi)*vect[0, :] + np.sin(phi)*vect[1, :] vPhi = -np.sin(phi)*vect[0, :] + np.cos(phi)*vect[1, :] vZ = vect[2, :] # Prepare output shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size valR = np.full(tuple(shapeval), fill_value) valPhi = np.full(tuple(shapeval), fill_value) valZ = np.full(tuple(shapeval), fill_value) # Interpolate ntall, indt, indtu, indtq = _DataCollection_comp._get_indtu( t=t, tall=tall, indtq=indtq, tbinall=tbinall, )[1:4] for ii in range(0, ntall): ind = indt == indtu[ii] valR[ind, ...] = mplTriLinInterp( mpltri, vq2dR[indtq[indtu[ii]], :], trifinder=trifind, )(r, z) valPhi[ind, ...] = mplTriLinInterp( mpltri, vq2dPhi[indtq[indtu[ii]], :], trifinder=trifind, )(r, z) valZ[ind, ...] = mplTriLinInterp( mpltri, vq2dZ[indtq[indtu[ii]], :], trifinder=trifind, )(r, z) if Type == 'sca': val = (valR*vR[None, :] + valPhi*vPhi[None, :] + valZ*vZ[None, :]) elif Type == 'abs(sca)': val = np.abs(valR*vR[None, :] + valPhi*vPhi[None, :] + valZ*vZ[None, :]) val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t # -------------------- # Degree 0 interpolation else: def func(pts, vect=None, t=None, ntall=ntall, trifind=trifind, vq2dR=vq2dR, vq2dPhi=vq2dPhi, vq2dZ=vq2dZ, indtq=indtq, tall=tall, tbinall=tbinall): # Get pts in (r,z,phi) r, z = np.hypot(pts[0, :], pts[1, :]), pts[2, :] phi = np.arctan2(pts[1, :], pts[0, :]) # Deduce vect in (r,z,phi) vR = np.cos(phi)*vect[0, :] + np.sin(phi)*vect[1, :] vPhi = -np.sin(phi)*vect[0, :] + np.cos(phi)*vect[1, :] vZ = vect[2, :] # Prepare output shapeval = list(pts.shape) shapeval[0] = ntall if t is None else t.size valR = np.full(tuple(shapeval), fill_value) valPhi = np.full(tuple(shapeval), fill_value) valZ = np.full(tuple(shapeval), fill_value) # Interpolate indpts = trifind(r, z) if isinstance(indpts, tuple): indr, indz = trifind(r, z) indok = (indr > -1) & (indz > -1) indr = indr[indok] indz = indz[indok] else: indok = indpts > -1 indpts = indpts[indok] ntall, indt, indtu, indtq = _DataCollection_comp._get_indtu( t=t, tall=tall, indtq=indtq, tbinall=tbinall, )[1:-2] if isinstance(indpts, tuple): for ii in range(0, ntall): ind = indt == indtu[ii] valR[ind, indok] = vq2dR[indtq[indtu[ii]], indr, indz] valPhi[ind, indok] = vq2dPhi[indtq[indtu[ii]], indr, indz] valZ[ind, indok] = vq2dZ[indtq[indtu[ii]], indr, indz] else: for ii in range(0, ntall): ind = indt == indtu[ii] valR[ind, indok] = vq2dR[indtq[indtu[ii]], indpts] valPhi[ind, indok] = vq2dPhi[indtq[indtu[ii]], indpts] valZ[ind, indok] = vq2dZ[indtq[indtu[ii]], indpts] if Type == 'sca': val = (valR*vR[None, :] + valPhi*vPhi[None, :] + valZ*vZ[None, :]) elif Type == 'abs(sca)': val = np.abs(valR*vR[None, :] + valPhi*vPhi[None, :] + valZ*vZ[None, :]) val[np.isnan(val)] = fill_value if t is None and tall is not None: t = tall return val, t return func