-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdvg_fftw_welchpowerspectrum.py
237 lines (186 loc) · 8.51 KB
/
dvg_fftw_welchpowerspectrum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Performs lightning-fast power-spectrum calculations on 1D time series data
acquired at a fixed sampling frequency using Welch's method.
The fast-Fourier transform (FFT) is performed by the excellent `fftw`
(http://www.fftw.org/) library. It will plan the transformations ahead of time
to optimize the calculations. Also, multiple threads can be specified for the
FFT and, when set to > 1, the Python GIL will not be invoked. This results in
true multithreading across multiple cores, which can result in a huge
performance gain. It can outperform the `numpy` and `scipy` libraries by a
factor of > 8 in calculation speed.
Futher improvement to the calculation speed in this module comes from the use of
the `numba.njit()` decorator around arithmetic functions, releasing the Python
GIL as well.
"""
__author__ = "Dennis van Gils"
__authoremail__ = "[email protected]"
__url__ = "https://github.com/Dennis-van-Gils/python-dvg-signal-processing"
__date__ = "29-05-2021"
__version__ = "1.0.0"
# pylint: disable=invalid-name, missing-function-docstring, too-many-instance-attributes
import sys
import numpy as np
from scipy import signal
import pyfftw
from numba import njit
p_njit = {"nogil": True, "cache": True, "fastmath": False}
@njit("float64[:,:](float64[:], float64[:,:])", **p_njit)
def fast_multiply_window(window: np.ndarray, data: np.ndarray) -> np.ndarray:
return np.multiply(window, data)
@njit("float64[:,:](complex128[:,:], float64)", **p_njit)
def fast_conjugate_rescale(data: np.ndarray, scale: float) -> np.ndarray:
data = np.multiply(np.conjugate(data), data)
return np.multiply(np.real(data), scale)
@njit("float64[:,:](float64[:,:])", **p_njit)
def fast_transpose(data: np.ndarray) -> np.ndarray:
return np.transpose(data)
@njit("float64[:](float64[:])", **p_njit)
def fast_10log10(data: np.ndarray) -> np.ndarray:
return np.multiply(np.log10(data), 10)
# ------------------------------------------------------------------------------
# FFTW_WelchPowerSpectrum
# ------------------------------------------------------------------------------
class FFTW_WelchPowerSpectrum:
"""Manages a power-spectrum calculation on 1D time series data `data` as
passed to methods `compute_spectrum()` or `compute_spectrum_dB()`.
The input data array must always be of the same length as specified by
argument `len_data`. When the length of the passed input array is not equal
to the `len_data`, an array full of `numpy.nan`s is returned.
The Welch algorithm is based on: `scipy.signal.welch()` with hard-coded
defaults:
window = 'hanning'
noverlap = 50 %
detrend = False
scaling = 'spectrum'
mode = 'psd'
boundary = None
padded = False
sides = 'onesided'
Args:
len_data (int):
Full length of the upcoming input array `data` passed to methods
`compute_spectrum()` or `compute_spectrum_dB().
fs (float):
Sampling frequency of the time series data [Hz].
nperseg (float):
Length of each segment in Welch's method to average over.
fftw_threads (int, optional):
Number of threads to use for the FFT transformations. When set to
> 1, the Python GIL will not be invoked.
Default: 5
Attributes:
freqs (np.ndarray):
The frequency table in [Hz] corresponding to the power spectrum
output of `compute_spectrum()` and `compute_spectrum_dB()`.
"""
def __init__(self, len_data: int, fs: float, nperseg: int, fftw_threads=5):
nperseg = int(nperseg)
if nperseg > len_data:
print(
"nperseg = {0:d} is greater than input length "
" = {1:d}, using nperseg = {1:d}".format(nperseg, len_data)
)
nperseg = len_data
self.len_data = len_data
self.fs = fs
self.nperseg = nperseg
# Calculate the Hanning window in advance
self.win = signal.hann(nperseg, False)
self.scale = 1.0 / self.win.sum() ** 2 # For normalization
# Calculate the frequency table in advance
self.freqs = np.fft.rfftfreq(nperseg, 1 / fs)
# Prepare the FFTW plan
# fmt: off
self.noverlap = nperseg // 2
self.step = nperseg - self.noverlap
self.shape_in = ((len_data - self.noverlap) // self.step, nperseg)
self.shape_out = (
(len_data - self.noverlap) // self.step,
nperseg // 2 + 1,
)
# fmt: on
self._rfft_in = pyfftw.empty_aligned(self.shape_in, dtype="float64")
self._rfft_out = pyfftw.empty_aligned(
self.shape_out, dtype="complex128"
)
print("Creating FFTW plan for Welch power spectrum...", end="")
sys.stdout.flush()
self._fftw_welch = pyfftw.FFTW(
self._rfft_in,
self._rfft_out,
flags=("FFTW_MEASURE", "FFTW_DESTROY_INPUT"),
threads=fftw_threads,
)
print(" done.")
# --------------------------------------------------------------------------
# compute_spectrum
# --------------------------------------------------------------------------
def compute_spectrum(self, data: np.ndarray) -> np.ndarray:
"""Returns the power spectrum array of the passed 1D time series array
`data`. When `data` is in arbitrary units of [V], the output units will
be [V^2]. Use `compute_spectrum_dB()` to get the equivalent power ratio
in units of [dBV].
If `data` is not yet fully populated with data as specified by the
initialisation parameter `len_data`, this method will return an array
filled with `numpy.nan`.
Returns:
The power spectrum array as a 1D numpy array in units of [V^2].
"""
x = np.asarray(data)
if self.len_data != len(x):
return (
np.full(self.len_data, np.nan),
np.full(self.len_data, np.nan),
)
strides = (self.step * x.strides[-1], x.strides[-1])
Pxx_in = np.lib.stride_tricks.as_strided(
x, shape=self.shape_in, strides=strides
)
# Apply window
Pxx_in = fast_multiply_window(self.win, Pxx_in)
# Perform the fft
self._rfft_in[:] = Pxx_in # float64
Pxx = self._fftw_welch() # returns complex128
# Equivalent of:
# Pxx = np.conjugate(Pxx) * Pxx
# Pxx = Pxx.real * self.scale
Pxx = fast_conjugate_rescale(Pxx, self.scale)
if self.nperseg % 2:
Pxx[..., 1:] *= 2
else:
# Last point is unpaired Nyquist freq point, don't double
Pxx[..., 1:-1] *= 2
Pxx = fast_transpose(Pxx)
# Average over windows
if len(Pxx.shape) >= 2 and Pxx.size > 0:
if Pxx.shape[-1] > 1:
Pxx = Pxx.mean(axis=-1)
else:
Pxx = np.reshape(Pxx, Pxx.shape[:-1])
return Pxx
# --------------------------------------------------------------------------
# compute_spectrum_dB
# --------------------------------------------------------------------------
def compute_spectrum_dB(self, data: np.ndarray) -> np.ndarray:
"""Returns the power spectrum array of the passed 1D time series array
`data`. When `data` is in arbitrary units of [V], the output units will
be [dBV].
power [dBV]: `10 * log_10(P_in / P_ref)`, where `P_ref` = 1 [V].
If `data` is not yet fully populated with data as specified by the
initialisation parameter `len_data`, this method will return an array
filled with `numpy.nan`.
Physics note:
Technically, `data` should have units of power [W], hence the name
'power spectrum'. The output of this method will then have units of
[dBW]. However, if we measure a voltage, in order to calculate the
power we should also know the impedance `Z` to get to the electrical
power `P = V^2 / Z`. Because we don't always have access to the value
of the impedance, an engineering solution is to neglect the impedance
and simply use `V^2` as the power. Taking 1 `V^2` as the reference
'power', the power amplitude is now represented by the 'engineering'
units of [dBV], instead of [dBW].
Returns:
The power spectrum array as a 1D numpy array in units of [dBV].
"""
return fast_10log10(self.compute_spectrum(data))