From a7faad92702bfcf205a897852b5605f04a832a80 Mon Sep 17 00:00:00 2001
From: Jean-Baptiste Bayle <j2b.bayle@gmail.com>
Date: Wed, 21 Sep 2022 20:43:33 +0100
Subject: [PATCH] Use concurrency for noise generation and interpolations

---
 lisainstrument/instrument.py | 181 ++++++++++++++++++++++-------------
 1 file changed, 114 insertions(+), 67 deletions(-)

diff --git a/lisainstrument/instrument.py b/lisainstrument/instrument.py
index f9a385a..45953ed 100755
--- a/lisainstrument/instrument.py
+++ b/lisainstrument/instrument.py
@@ -951,7 +951,7 @@ class Instrument:
         self.distant_carrier_offsets = \
             -self.d_pprs * self.central_freq \
             + (1 - self.d_pprs) * self.local_carrier_offsets.distant() \
-            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]))
+            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]), concurrent=True)
 
         logger.debug("Propagating carrier fluctuations to distant MOSAs")
         carrier_fluctuations = \
@@ -959,7 +959,7 @@ class Instrument:
             - (self.central_freq + self.local_carrier_offsets) * self.distant_ttls / c
         propagated_carrier_fluctuations = \
             (1 - self.d_pprs) * carrier_fluctuations.distant() \
-            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]))
+            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]), concurrent=True)
         self.distant_carrier_fluctuations = \
             propagated_carrier_fluctuations \
             - (self.central_freq + self.local_carrier_offsets) * self.gws \
@@ -969,7 +969,7 @@ class Instrument:
         self.distant_usb_offsets = \
             -self.d_pprs * self.central_freq \
             + (1 - self.d_pprs) * self.local_usb_offsets.distant() \
-            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]))
+            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]), concurrent=True)
 
         logger.debug("Propagating upper sideband fluctuations to distant MOSAs")
         usb_fluctuations = \
@@ -977,7 +977,7 @@ class Instrument:
             - (self.central_freq + self.local_usb_offsets) * self.distant_ttls / c
         propagated_usb_fluctuations = \
             (1 - self.d_pprs) * usb_fluctuations.distant() \
-            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]))
+            .transformed(lambda mosa, x: self.interpolate(x, -self.pprs[mosa]), concurrent=True)
         self.distant_usb_fluctuations = \
             propagated_usb_fluctuations \
             - (self.central_freq + self.local_usb_offsets) * self.gws \
@@ -1195,7 +1195,7 @@ class Instrument:
 
         logger.info("Inverting THE with respect to TPS")
         self.tps_wrt_the = self.the_wrt_tps_local \
-            .transformed(lambda sc, x: self.invert_the_wrt_tps(x, sc))
+            .transformed(lambda sc, x: self.invert_the_wrt_tps(x, sc), concurrent=True)
 
         self.timestamped = \
             lambda mosa, x: self.interpolate(x, -self.tps_wrt_the.for_each_mosa()[mosa])
@@ -1205,85 +1205,85 @@ class Instrument:
         logger.debug("Sampling inter-spacecraft carrier beatnote fluctuations to THE grid")
         self.the_isi_carrier_offsets = (
             self.tps_isi_carrier_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling inter-spacecraft carrier beatnote fluctuations to THE grid")
         self.the_isi_carrier_fluctuations = (
             self.tps_isi_carrier_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_isi_carrier_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling inter-spacecraft upper sideband beatnote offsets to THE grid")
         self.the_isi_usb_offsets = (
             self.tps_isi_usb_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling inter-spacecraft upper sideband beatnote fluctuations to THE grid")
         self.the_isi_usb_fluctuations = (
             self.tps_isi_usb_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_isi_usb_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling inter-spacecraft DWS measurements to THE grid")
-        self.the_isi_dws_phis = self.tps_isi_dws_phis.transformed(self.timestamped)
-        self.the_isi_dws_etas = self.tps_isi_dws_etas.transformed(self.timestamped)
+        self.the_isi_dws_phis = self.tps_isi_dws_phis.transformed(self.timestamped, concurrent=True)
+        self.the_isi_dws_etas = self.tps_isi_dws_etas.transformed(self.timestamped, concurrent=True)
 
         logger.info("Sampling measured pseudo-ranges to THE grid")
-        self.the_mprs = self.tps_mprs.transformed(self.timestamped)
+        self.the_mprs = self.tps_mprs.transformed(self.timestamped, concurrent=True)
 
         logger.info("Sampling test-mass beatnotes to THE grid")
 
         logger.debug("Sampling test-mass carrier beatnote offsets to THE grid")
         self.the_tmi_carrier_offsets = (
             self.tps_tmi_carrier_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling test-mass carrier beatnote fluctuations to THE grid")
         self.the_tmi_carrier_fluctuations = (
             self.tps_tmi_carrier_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_tmi_carrier_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling test-mass upper sideband beatnote offsets to THE grid")
         self.the_tmi_usb_offsets = (
             self.tps_tmi_usb_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling test-mass upper sideband beatnote fluctuations to THE grid")
         self.the_tmi_usb_fluctuations = (
             self.tps_tmi_usb_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_tmi_usb_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.info("Sampling reference beatnotes to THE grid")
 
         logger.debug("Sampling reference carrier beatnote offsets to THE grid")
         self.the_rfi_carrier_offsets = (
             self.tps_rfi_carrier_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling reference carrier beatnote fluctuations to THE grid")
         self.the_rfi_carrier_fluctuations = (
             self.tps_rfi_carrier_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_rfi_carrier_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling reference upper sideband beatnote offsets to THE grid")
         self.the_rfi_usb_offsets = (
             self.tps_rfi_usb_offsets / (1 + self.clock_noise_offsets)
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         logger.debug("Sampling reference upper sideband beatnote fluctuations to THE grid")
         self.the_rfi_usb_fluctuations = (
             self.tps_rfi_usb_fluctuations / (1 + self.clock_noise_offsets)
                 - self.tps_rfi_usb_offsets * self.clock_noise_fluctuations
                 / (1 + self.clock_noise_offsets)**2
-        ).transformed(self.timestamped)
+        ).transformed(self.timestamped, concurrent=True)
 
         # Electronic delays
 
@@ -1293,51 +1293,78 @@ class Instrument:
         self.electro_rfi = lambda mosa, x: self.interpolate(x, -self.electro_delays_rfis[mosa])
 
         logger.debug("Applying electronic delays to inter-spacecraft beatnotes")
-        self.electro_isi_carrier_offsets = self.the_isi_carrier_offsets.transformed(self.electro_isi)
-        self.electro_isi_carrier_fluctuations = self.the_isi_carrier_fluctuations.transformed(self.electro_isi)
-        self.electro_isi_usb_offsets = self.the_isi_usb_offsets.transformed(self.electro_isi)
-        self.electro_isi_usb_fluctuations = self.the_isi_usb_fluctuations.transformed(self.electro_isi)
+        self.electro_isi_carrier_offsets = self.the_isi_carrier_offsets \
+            .transformed(self.electro_isi, concurrent=True)
+        self.electro_isi_carrier_fluctuations = self.the_isi_carrier_fluctuations \
+            .transformed(self.electro_isi, concurrent=True)
+        self.electro_isi_usb_offsets = self.the_isi_usb_offsets \
+            .transformed(self.electro_isi, concurrent=True)
+        self.electro_isi_usb_fluctuations = self.the_isi_usb_fluctuations \
+            .transformed(self.electro_isi, concurrent=True)
 
         logger.debug("Applying electronic delays to test-mass beatnotes")
-        self.electro_tmi_carrier_offsets = self.the_tmi_carrier_offsets.transformed(self.electro_tmi)
-        self.electro_tmi_carrier_fluctuations = self.the_tmi_carrier_fluctuations.transformed(self.electro_tmi)
-        self.electro_tmi_usb_offsets = self.the_tmi_usb_offsets.transformed(self.electro_tmi)
-        self.electro_tmi_usb_fluctuations = self.the_tmi_usb_fluctuations.transformed(self.electro_tmi)
+        self.electro_tmi_carrier_offsets = self.the_tmi_carrier_offsets \
+            .transformed(self.electro_tmi, concurrent=True)
+        self.electro_tmi_carrier_fluctuations = self.the_tmi_carrier_fluctuations \
+            .transformed(self.electro_tmi, concurrent=True)
+        self.electro_tmi_usb_offsets = self.the_tmi_usb_offsets \
+            .transformed(self.electro_tmi, concurrent=True)
+        self.electro_tmi_usb_fluctuations = self.the_tmi_usb_fluctuations \
+            .transformed(self.electro_tmi, concurrent=True)
 
         logger.debug("Applying electronic delays to reference beatnotes")
-        self.electro_rfi_carrier_offsets = self.the_rfi_carrier_offsets.transformed(self.electro_rfi)
-        self.electro_rfi_carrier_fluctuations = self.the_rfi_carrier_fluctuations.transformed(self.electro_rfi)
-        self.electro_rfi_usb_offsets = self.the_rfi_usb_offsets.transformed(self.electro_rfi)
-        self.electro_rfi_usb_fluctuations = self.the_rfi_usb_fluctuations.transformed(self.electro_rfi)
+        self.electro_rfi_carrier_offsets = self.the_rfi_carrier_offsets \
+            .transformed(self.electro_rfi, concurrent=True)
+        self.electro_rfi_carrier_fluctuations = self.the_rfi_carrier_fluctuations \
+            .transformed(self.electro_rfi, concurrent=True)
+        self.electro_rfi_usb_offsets = self.the_rfi_usb_offsets \
+            .transformed(self.electro_rfi, concurrent=True)
+        self.electro_rfi_usb_fluctuations = self.the_rfi_usb_fluctuations \
+            .transformed(self.electro_rfi, concurrent=True)
 
         ## Antialiasing filtering
 
         logger.info("Filtering beatnotes")
 
         logger.debug("Filtering inter-spacecraft beatnotes")
-        self.filtered_isi_carrier_offsets = self.electro_isi_carrier_offsets.transformed(self.aafilter)
-        self.filtered_isi_carrier_fluctuations = self.electro_isi_carrier_fluctuations.transformed(self.aafilter)
-        self.filtered_isi_usb_offsets = self.electro_isi_usb_offsets.transformed(self.aafilter)
-        self.filtered_isi_usb_fluctuations = self.electro_isi_usb_fluctuations.transformed(self.aafilter)
+        self.filtered_isi_carrier_offsets = self.electro_isi_carrier_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_isi_carrier_fluctuations = self.electro_isi_carrier_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_isi_usb_offsets = self.electro_isi_usb_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_isi_usb_fluctuations = self.electro_isi_usb_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
 
         logger.debug("Filtering inter-spacecraft DWS measurements")
-        self.filtered_isi_dws_phis = self.the_isi_dws_phis.transformed(self.aafilter)
-        self.filtered_isi_dws_etas = self.the_isi_dws_etas.transformed(self.aafilter)
+        self.filtered_isi_dws_phis = self.the_isi_dws_phis \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_isi_dws_etas = self.the_isi_dws_etas \
+            .transformed(self.aafilter, concurrent=True)
 
         logger.debug("Filtering measured pseudo-ranges")
-        self.filtered_mprs = self.the_mprs.transformed(self.aafilter)
+        self.filtered_mprs = self.the_mprs \
+            .transformed(self.aafilter, concurrent=True)
 
         logger.debug("Filtering test-mass beatnotes")
-        self.filtered_tmi_carrier_offsets = self.electro_tmi_carrier_offsets.transformed(self.aafilter)
-        self.filtered_tmi_carrier_fluctuations = self.electro_tmi_carrier_fluctuations.transformed(self.aafilter)
-        self.filtered_tmi_usb_offsets = self.electro_tmi_usb_offsets.transformed(self.aafilter)
-        self.filtered_tmi_usb_fluctuations = self.electro_tmi_usb_fluctuations.transformed(self.aafilter)
+        self.filtered_tmi_carrier_offsets = self.electro_tmi_carrier_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_tmi_carrier_fluctuations = self.electro_tmi_carrier_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_tmi_usb_offsets = self.electro_tmi_usb_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_tmi_usb_fluctuations = self.electro_tmi_usb_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
 
         logger.debug("Filtering reference beatnotes")
-        self.filtered_rfi_carrier_offsets = self.electro_rfi_carrier_offsets.transformed(self.aafilter)
-        self.filtered_rfi_carrier_fluctuations = self.electro_rfi_carrier_fluctuations.transformed(self.aafilter)
-        self.filtered_rfi_usb_offsets = self.electro_rfi_usb_offsets.transformed(self.aafilter)
-        self.filtered_rfi_usb_fluctuations = self.electro_rfi_usb_fluctuations.transformed(self.aafilter)
+        self.filtered_rfi_carrier_offsets = self.electro_rfi_carrier_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_rfi_carrier_fluctuations = self.electro_rfi_carrier_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_rfi_usb_offsets = self.electro_rfi_usb_offsets \
+            .transformed(self.aafilter, concurrent=True)
+        self.filtered_rfi_usb_fluctuations = self.electro_rfi_usb_fluctuations \
+            .transformed(self.aafilter, concurrent=True)
 
         ## Downsampling filtering
 
@@ -1435,7 +1462,8 @@ class Instrument:
             noises.clock(
                 self.physics_fs,
                 self.physics_size + self.initial_telemetry_physics_size,
-                self.clock_asds[sc])
+                self.clock_asds[sc]),
+            concurrent=True
         )
 
         # Slice to only select physics period
@@ -1451,7 +1479,8 @@ class Instrument:
                 cumulative_trapezoid(np.broadcast_to(
                     self.clock_noise_fluctuations_withinitial[sc],
                     self.physics_size + self.initial_telemetry_physics_size),
-                dx=self.physics_dt, initial=0)
+                dx=self.physics_dt, initial=0),
+                concurrent=True
             )
 
         # Slice to only select physics period
@@ -1464,7 +1493,8 @@ class Instrument:
 
         logger.info("Generating modulation noise")
         self.modulation_noises = ForEachMOSA(lambda mosa:
-            noises.modulation(self.physics_fs, self.physics_size, self.modulation_asds[mosa])
+            noises.modulation(self.physics_fs, self.physics_size, self.modulation_asds[mosa]),
+            concurrent=True
         )
 
         ## Backlink noise
@@ -1472,7 +1502,8 @@ class Instrument:
         logger.info("Generating backlink noise")
         self.backlink_noises = ForEachMOSA(lambda mosa:
             noises.backlink(self.physics_fs, self.physics_size,
-                self.backlink_asds[mosa], self.backlink_fknees[mosa])
+                self.backlink_asds[mosa], self.backlink_fknees[mosa]),
+            concurrent=True
         )
 
         ## Test-mass acceleration noise
@@ -1480,7 +1511,8 @@ class Instrument:
         logger.info("Generating test-mass acceleration noise")
         self.testmass_noises = ForEachMOSA(lambda mosa:
             noises.testmass(self.physics_fs, self.physics_size,
-                self.testmass_asds[mosa], self.testmass_fknees[mosa])
+                self.testmass_asds[mosa], self.testmass_fknees[mosa]),
+            concurrent=True
         )
 
         ## Ranging noise
@@ -1488,7 +1520,8 @@ class Instrument:
         logger.info("Generating ranging noise")
         self.ranging_noises = ForEachMOSA(lambda mosa:
             self.ranging_biases[mosa] + noises.ranging(self.physics_fs,
-                self.physics_size, self.ranging_asds[mosa])
+                self.physics_size, self.ranging_asds[mosa]),
+            concurrent=True
         )
 
         ## OMS noise
@@ -1497,32 +1530,38 @@ class Instrument:
 
         self.oms_isi_carrier_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_isi_carrier_asds[mosa], self.oms_fknees[mosa])
+                self.oms_isi_carrier_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         self.oms_isi_usb_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_isi_usb_asds[mosa], self.oms_fknees[mosa])
+                self.oms_isi_usb_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         self.oms_tmi_carrier_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_tmi_carrier_asds[mosa], self.oms_fknees[mosa])
+                self.oms_tmi_carrier_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         self.oms_tmi_usb_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_tmi_usb_asds[mosa], self.oms_fknees[mosa])
+                self.oms_tmi_usb_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         self.oms_rfi_carrier_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_rfi_carrier_asds[mosa], self.oms_fknees[mosa])
+                self.oms_rfi_carrier_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         self.oms_rfi_usb_noises = ForEachMOSA(lambda mosa:
             noises.oms(self.physics_fs, self.physics_size,
-                self.oms_rfi_usb_asds[mosa], self.oms_fknees[mosa])
+                self.oms_rfi_usb_asds[mosa], self.oms_fknees[mosa]),
+            concurrent=True
         )
 
         ## DWS measurement noise
@@ -1530,10 +1569,12 @@ class Instrument:
         logger.info("Generating DWS measurement noise")
 
         self.dws_phi_noises = ForEachMOSA(lambda mosa:
-            noises.dws(self.physics_fs, self.physics_size, self.dws_asds[mosa])
+            noises.dws(self.physics_fs, self.physics_size, self.dws_asds[mosa]),
+            concurrent=True
         )
         self.dws_eta_noises = ForEachMOSA(lambda mosa:
-            noises.dws(self.physics_fs, self.physics_size, self.dws_asds[mosa])
+            noises.dws(self.physics_fs, self.physics_size, self.dws_asds[mosa]),
+            concurrent=True
         )
 
         ## MOC time correlation noise
@@ -1542,7 +1583,8 @@ class Instrument:
             noises.moc_time_correlation(
                 self.telemetry_fs,
                 self.telemetry_size,
-                self.moc_time_correlation_asds[sc])
+                self.moc_time_correlation_asds[sc]),
+            concurrent=True
         )
 
         ## Angular jitters
@@ -1551,26 +1593,31 @@ class Instrument:
 
         self.sc_jitter_phis = ForEachSC(lambda sc:
             noises.jitter(self.physics_fs, self.physics_size,
-                self.sc_jitter_phi_asds[sc], self.sc_jitter_phi_fknees[sc])
+                self.sc_jitter_phi_asds[sc], self.sc_jitter_phi_fknees[sc]),
+            concurrent=True
         )
         self.sc_jitter_etas = ForEachSC(lambda sc:
             noises.jitter(self.physics_fs, self.physics_size,
-                self.sc_jitter_eta_asds[sc], self.sc_jitter_eta_fknees[sc])
+                self.sc_jitter_eta_asds[sc], self.sc_jitter_eta_fknees[sc]),
+            concurrent=True
         )
         self.sc_jitter_thetas = ForEachSC(lambda sc:
             noises.jitter(self.physics_fs, self.physics_size,
-                self.sc_jitter_theta_asds[sc], self.sc_jitter_theta_fknees[sc])
+                self.sc_jitter_theta_asds[sc], self.sc_jitter_theta_fknees[sc]),
+            concurrent=True
         )
 
         logger.info("Generating MOSA angular jitters")
 
         self.mosa_jitter_phis = ForEachMOSA(lambda mosa:
             noises.jitter(self.physics_fs, self.physics_size,
-                self.mosa_jitter_phi_asds[mosa], self.mosa_jitter_phi_fknees[mosa])
+                self.mosa_jitter_phi_asds[mosa], self.mosa_jitter_phi_fknees[mosa]),
+            concurrent=True
         )
         self.mosa_jitter_etas = ForEachMOSA(lambda mosa:
             noises.jitter(self.physics_fs, self.physics_size,
-                self.mosa_jitter_eta_asds[mosa], self.mosa_jitter_eta_fknees[mosa])
+                self.mosa_jitter_eta_asds[mosa], self.mosa_jitter_eta_fknees[mosa]),
+            concurrent=True
         )
 
         logger.info("Computing MOSA total angular jitters")
-- 
GitLab