Skip to content
Snippets Groups Projects
Commit dc311020 authored by Jean-Baptiste Bayle's avatar Jean-Baptiste Bayle
Browse files

Add instrument simulation

parent 8aafb998
No related branches found
No related tags found
1 merge request!2Add instrument simulation
This commit is part of merge request !2. Comments created here will be created in the context of that merge request.
......@@ -6,15 +6,4 @@ from .meta import __version__
from .meta import __author__
from .meta import __email__
from .noises import white
from .noises import violet
from .noises import pink
from .noises import red
from .noises import infrared
from .noises import laser
from .noises import clock
from .noises import modulation
from .noises import backlink
from .noises import ranging
from .noises import testmass
from .lisa import Instrument
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Digital signal processing.
Authors:
Jean-Baptiste Bayle <j2b.bayle@gmail.com>
"""
import logging
import scipy.sparse
import numpy
def timeshift(data, shift, order=31):
"""Shift `data` in time by `shift` samples.
Args:
data: array to be shited
shift: scalar or array of time shifts to be applied [samples]
order: interpolation order
"""
logging.debug("Time shifting data '%s' by '%s' samples (order=%d)", data, shift, order)
if numpy.isscalar(data):
logging.debug("Data is a constant scalar and cannot be time shifted")
return data
if numpy.all(shift == 0):
logging.debug("Time shift is vanishing and cannot be applied")
return data
data = numpy.asarray(data)
mode = 'timeseries' if isinstance(shift, numpy.ndarray) else 'constant'
logging.debug("Using mode '%s'", mode)
if mode == 'timeseries' and data.size != shift.size:
raise ValueError(f"`data` and `shift` must be of the same size (got {data.size}, {shift.size})")
num_tabs = order + 1
p = num_tabs // 2 # pylint: disable=invalid-name
size = data.size
k = numpy.floor(shift).astype(int)
eps = shift - k
coeffs = lagrange_coeffs(eps, num_tabs, p)
logging.debug("Using Lagrange coefficiens '%s'", coeffs)
if mode == 'timeseries':
logging.debug("Computing Lagrange matrix")
indices = numpy.arange(size)
i_min = numpy.min(k - (p - 1) + indices)
i_max = numpy.max(k + p + indices + 1)
csr_ind = numpy.tile(numpy.arange(num_tabs), size) + numpy.repeat(k + indices, num_tabs) - (p - 1)
csr_ptr = num_tabs * numpy.arange(size + 1)
mat = scipy.sparse.csr_matrix((numpy.ravel(coeffs), csr_ind - i_min, csr_ptr), shape=(size, i_max - i_min))
logging.debug("Padding data (left=%d, right=%d)", max(0, -i_min), max(0, i_max - size))
data_padded = numpy.pad(data[max(0, i_min):min(size, i_max)], (max(0, -i_min), max(0, i_max - size)))
logging.debug("Computing matrix-vector product")
shifted = mat.dot(data_padded)
elif mode == 'constant':
i_min = k - (p - 1)
i_max = k + p + size
logging.debug("Padding data (left=%d, right=%d)", max(0, -i_min), max(0, i_max - size))
data_padded = numpy.pad(data[max(0, i_min):min(size, i_max)], (max(0, -i_min), max(0, i_max - size)))
logging.debug("Computing correlation product")
shifted = numpy.correlate(data_padded, coeffs[0], mode="valid")
return shifted
def lagrange_coeffs(eps, num_tabs, p): # pylint: disable=invalid-name
"""Calculate coefficients for lagrange interpolation"""
coeffs = numpy.zeros([num_tabs, eps.size])
if p > 1:
factor = numpy.ones(eps.size, dtype=numpy.float64)
factor *= eps * (1 - eps)
for j in range(1, p):
factor *= (-1) * (1 - j / p) / (1 + j / p)
coeffs[p - 1 - j] = factor / (j + eps)
coeffs[p + j] = factor / (j + 1 - eps)
coeffs[p - 1] = 1 - eps
coeffs[p] = eps
for j in range(2, p):
coeffs *= 1 - (eps / j)**2
coeffs *= (1 + eps) * (1 - eps / p)
else:
coeffs[p - 1] = 1 - eps
coeffs[p] = eps
return coeffs.T
def time_shift(data, shift, order=31):
"""Shift data in time by tau
Args:
data (numpy.ndarray): data to be shited
tau (numpy.ndarray): amount of time each data point is to be shifted
(must be of same dimension as data)
fs (double): sampling frequency in (Hz)
order (int): interpolation order
"""
logging.debug("Time shifting data '%s' by '%s' (order=%d)", data, shift, order)
if numpy.isscalar(data):
logging.debug("Data is a constant scalar and cannot be time shifted")
return data
if numpy.all(shift == 0):
logging.debug("Time shift is vanishing and cannot be applied")
return data
data = numpy.asarray(data)
mode = "timeseries" if isinstance(shift, numpy.ndarray) else "constant"
logging.debug("Using mode '%s'", mode)
if mode == "timeseries" and data.size != shift.size:
raise ValueError(f"`data` and `tau` must be of the same size (got {data.size}, {shift.size})")
num_tabs = order + 1
p = num_tabs // 2 # pylint: disable=invalid-name
size = data.size
def lagrange_coeffs(eps):
"""Calculate coefficients for lagrange interpolation"""
coeffs = numpy.zeros([num_tabs, eps.size])
if p > 1:
factor = numpy.ones(eps.size, dtype=numpy.float64)
factor *= eps * (1 - eps)
for j in range(1, p):
factor *= (-1) * (1 - j / p) / (1 + j / p)
coeffs[p - 1 - j] = factor / (j + eps)
coeffs[p + j] = factor / (j + 1 - eps)
coeffs[p - 1] = 1 - eps
coeffs[p] = eps
for j in range(2, p):
coeffs *= 1 - (eps / j)**2
coeffs *= (1 + eps) * (1 - eps / p)
else:
coeffs[p - 1] = 1 - eps
coeffs[p] = eps
return coeffs.T
k = numpy.floor(shift).astype(int)
eps = shift - k
coeffs = lagrange_coeffs(eps)
logging.debug("Using Lagrange coefficiens '%s'", coeffs)
if mode == "timeseries":
logging.debug("Computing Lagrange matrix")
indices = numpy.arange(size)
i_min = numpy.min(k - (p - 1) + indices)
i_max = numpy.max(k + p + indices + 1)
csr_ind = numpy.tile(numpy.arange(num_tabs), size) + numpy.repeat(k + indices, num_tabs) - (p - 1)
csr_ptr = num_tabs * numpy.arange(size + 1)
mat = scipy.sparse.csr_matrix((numpy.ravel(coeffs), csr_ind - i_min, csr_ptr), shape=(size, i_max - i_min))
logging.debug("Padding data (left=%d, right=%d)", max(0, -i_min), max(0, i_max - size))
data_padded = numpy.pad(data[max(0, i_min):min(size, i_max)], (max(0, -i_min), max(0, i_max - size)))
logging.debug("Computing matrix-vector product")
shifted = mat.dot(data_padded)
elif mode == "constant":
i_min = k - (p - 1)
i_max = k + p + size
logging.debug("Padding data (left=%d, right=%d)", max(0, -i_min), max(0, i_max - size))
data_padded = numpy.pad(data[max(0, i_min):min(size, i_max)], (max(0, -i_min), max(0, i_max - size)))
logging.debug("Computing correlation product")
shifted = numpy.correlate(data_padded, coeffs[0], mode="valid")
return shifted
This diff is collapsed.
astroid==2.4.2
attrs==20.3.0
cycler==0.10.0
h5py==3.1.0
iniconfig==1.1.1
isort==5.7.0
kiwisolver==1.3.1
lazy-object-proxy==1.4.3
lisaconstants==0.0.3
matplotlib==3.3.3
mccabe==0.6.1
numpy==1.19.5
packaging==20.8
Pillow==8.1.0
pluggy==0.13.1
py==1.10.0
pylint==2.6.0
pyparsing==2.4.7
pyplnoise==1.3
pytest==6.2.1
python-dateutil==2.8.1
scipy==1.6.0
six==1.15.0
toml==0.10.2
wrapt==1.12.1
git+https://lisainstrument-ci:kSRBLBj7s3R7zfgnQiZx@gitlab.in2p3.fr/lisa-simulation/python-constants.git@v0.0.3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment