From 958239e4ddc8c038d4438e1f0585cd49e4a1b60f Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Mon, 13 Jan 2025 19:00:24 +0000 Subject: [PATCH 1/3] spikeglx streamer bugfix --- brainbox/io/spikeglx.py | 65 ++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index f9c91da73..043c3cf67 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -8,6 +8,7 @@ import numpy as np from one.alf.path import remove_uuid_string +from one.alf.spec import is_uuid_string import spikeglx @@ -79,33 +80,32 @@ def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=T ch = np.asarray(ch) ch = ch.reshape((ch.size, 1)) if ch.size == 1 else ch if np.any(ch < 0) or np.any(ch > n_ch_probe): - raise Exception('At least one specified channel number is impossible. ' - f'The minimum channel number was {np.min(ch)}, ' - f'and the maximum channel number was {np.max(ch)}. ' - 'Check specified channel numbers and try again.') + raise Exception( + 'At least one specified channel number is impossible. ' + f'The minimum channel number was {np.min(ch)}, ' + f'and the maximum channel number was {np.max(ch)}. ' + 'Check specified channel numbers and try again.' + ) if car: # compute spatial noise in chunks # see https://github.com/int-brain-lab/iblenv/issues/5 - raise NotImplementedError("CAR option is not available") + raise NotImplementedError('CAR option is not available') # Initialize `waveforms`, extract waveforms from `file_m`, and CAR. waveforms = np.zeros((len(ts), 2 * n_wf_samples, ch.size)) # Give time estimate for extracting waveforms. t0 = time.perf_counter() for i in range(5): - waveforms[i, :, :] = \ - file_m[i * n_wf_samples * 2 + t_sample_first: - i * n_wf_samples * 2 + t_sample_first + n_wf_samples * 2, ch].reshape( - (n_wf_samples * 2, ch.size)) + waveforms[i, :, :] = file_m[ + i * n_wf_samples * 2 + t_sample_first : i * n_wf_samples * 2 + t_sample_first + n_wf_samples * 2, ch + ].reshape((n_wf_samples * 2, ch.size)) dt = time.perf_counter() - t0 - print('Performing waveform extraction. Estimated time is {:.2f} mins. ({})' - .format(dt * len(ts) / 60 / 5, time.ctime())) + print('Performing waveform extraction. Estimated time is {:.2f} mins. ({})'.format(dt * len(ts) / 60 / 5, time.ctime())) for spk, _ in enumerate(ts): # extract waveforms spk_ts_sample = ts_samples[spk] spk_samples = np.arange(spk_ts_sample - n_wf_samples, spk_ts_sample + n_wf_samples) # have to reshape to add an axis to broadcast `file_m` into `waveforms` - waveforms[spk, :, :] = \ - file_m[spk_samples[0]:spk_samples[-1] + 1, ch].reshape((spk_samples.size, ch.size)) + waveforms[spk, :, :] = file_m[spk_samples[0] : spk_samples[-1] + 1, ch].reshape((spk_samples.size, ch.size)) print('Done. ({})'.format(time.ctime())) return waveforms @@ -118,6 +118,7 @@ class Streamer(spikeglx.Reader): sr = Streamer(pid=pid, one=one) raw_voltage = sr[int(t0 * sr.fs):int((t0 + nsecs) * sr.fs), :] """ + def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.target_dir = None # last chunk directory download or read self.one = one @@ -125,15 +126,25 @@ def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.cache_folder = cache_folder or Path(self.one.alyx._par.CACHE_DIR).joinpath('cache', typ) self.remove_cached = remove_cached self.eid, self.pname = self.one.pid2eid(pid) - self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}") - meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}") - cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True) + self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f'*{self.pname}') + sglx_file = self.one.list_datasets(self.eid, f'*.{typ}.cbin', collection=f'*{self.pname}') + assert len(sglx_file) == 1 + sglx_file = sglx_file[0] + meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f'*{self.pname}') + # check if an uuid is part of meta_file, and if so, add the corresponding uuid to the sglx_file + if is_uuid_string(meta_file.name.split('.')[-2]): + parts = Path(sglx_file).name.split('.') + parts.insert(-1, '*') + pattern = '.'.join(parts) + sglx_file = list(meta_file.parent.glob(pattern))[0] + sglx_file = self.one.eid2path(self.eid) / sglx_file + cbin_rec = self.one.list_datasets(self.eid, collection=f'*{self.pname}', filename=f'*{typ}.*bin', details=True) cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x)) self.url_cbin = self.one.record2url(cbin_rec)[0] with open(self.file_chunks, 'r') as f: self.chunks = json.load(f) self.chunks['chunk_bounds'] = np.array(self.chunks['chunk_bounds']) - super(Streamer, self).__init__(meta_file, ignore_warnings=True) + super(Streamer, self).__init__(sglx_file=sglx_file, meta_file=meta_file, ignore_warnings=True) def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True, volts=True): """ @@ -146,9 +157,9 @@ def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True, volts=True): self.cache_folder.mkdir(exist_ok=True, parents=True) sr, file_cbin = self._download_raw_partial(first_chunk=first_chunk, last_chunk=last_chunk) if not volts: - data = np.copy(sr._raw[nsel.start - n0:nsel.stop - n0, csel]) + data = np.copy(sr._raw[nsel.start - n0 : nsel.stop - n0, csel]) else: - data = sr[nsel.start - n0: nsel.stop - n0, csel] + data = sr[nsel.start - n0 : nsel.stop - n0, csel] sr.close() if self.remove_cached: shutil.rmtree(self.target_dir) @@ -167,7 +178,7 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): webclient = self.one.alyx relpath = Path(self.url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0] # write the temp file into a subdirectory - tdir_chunk = f"chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}" + tdir_chunk = f'chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}' # for parallel processes, there is a risk of collisions if the removed cached flag is set to True # if the folder is to be removed append a unique identifier to avoid having duplicate names if self.remove_cached: @@ -190,8 +201,10 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): if ch_file_stream.exists() and ch_file_stream.with_suffix('.cbin').exists(): with open(ch_file_stream, 'r') as f: cmeta_stream = json.load(f) - if (cmeta_stream.get('chopped_first_sample', None) == i0 and - cmeta_stream.get('chopped_total_samples', None) == total_samples): + if ( + cmeta_stream.get('chopped_first_sample', None) == i0 + and cmeta_stream.get('chopped_total_samples', None) == total_samples + ): return spikeglx.Reader(ch_file_stream.with_suffix('.cbin'), ignore_warnings=True), ch_file_stream else: @@ -200,13 +213,13 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): cmeta = self.chunks.copy() # prepare the metadata file - cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk:last_chunk + 2] + cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk : last_chunk + 2] cmeta['chunk_bounds'] = [int(_ - i0) for _ in cmeta['chunk_bounds']] assert len(cmeta['chunk_bounds']) >= 2 assert cmeta['chunk_bounds'][0] == 0 first_byte = cmeta['chunk_offsets'][first_chunk] - cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk:last_chunk + 2] + cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk : last_chunk + 2] cmeta['chunk_offsets'] = [_ - first_byte for _ in cmeta['chunk_offsets']] assert len(cmeta['chunk_offsets']) >= 2 assert cmeta['chunk_offsets'][0] == 0 @@ -229,8 +242,8 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): while True: try: cbin_local_path = webclient.download_file( - self.url_cbin, chunks=(first_byte, n_bytes), - target_dir=self.target_dir, clobber=True, return_md5=False) + self.url_cbin, chunks=(first_byte, n_bytes), target_dir=self.target_dir, clobber=True, return_md5=False + ) break except Exception as e: retries += 1 From 012d787ff568c40b503893d9889fc88c823a7bf6 Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Mon, 13 Jan 2025 19:06:25 +0000 Subject: [PATCH 2/3] un-ruff --- brainbox/io/spikeglx.py | 56 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index 043c3cf67..e2856e36c 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -80,32 +80,33 @@ def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, car=T ch = np.asarray(ch) ch = ch.reshape((ch.size, 1)) if ch.size == 1 else ch if np.any(ch < 0) or np.any(ch > n_ch_probe): - raise Exception( - 'At least one specified channel number is impossible. ' - f'The minimum channel number was {np.min(ch)}, ' - f'and the maximum channel number was {np.max(ch)}. ' - 'Check specified channel numbers and try again.' - ) + raise Exception('At least one specified channel number is impossible. ' + f'The minimum channel number was {np.min(ch)}, ' + f'and the maximum channel number was {np.max(ch)}. ' + 'Check specified channel numbers and try again.') if car: # compute spatial noise in chunks # see https://github.com/int-brain-lab/iblenv/issues/5 - raise NotImplementedError('CAR option is not available') + raise NotImplementedError("CAR option is not available") # Initialize `waveforms`, extract waveforms from `file_m`, and CAR. waveforms = np.zeros((len(ts), 2 * n_wf_samples, ch.size)) # Give time estimate for extracting waveforms. t0 = time.perf_counter() for i in range(5): - waveforms[i, :, :] = file_m[ - i * n_wf_samples * 2 + t_sample_first : i * n_wf_samples * 2 + t_sample_first + n_wf_samples * 2, ch - ].reshape((n_wf_samples * 2, ch.size)) + waveforms[i, :, :] = \ + file_m[i * n_wf_samples * 2 + t_sample_first: + i * n_wf_samples * 2 + t_sample_first + n_wf_samples * 2, ch].reshape( + (n_wf_samples * 2, ch.size)) dt = time.perf_counter() - t0 - print('Performing waveform extraction. Estimated time is {:.2f} mins. ({})'.format(dt * len(ts) / 60 / 5, time.ctime())) + print('Performing waveform extraction. Estimated time is {:.2f} mins. ({})' + .format(dt * len(ts) / 60 / 5, time.ctime())) for spk, _ in enumerate(ts): # extract waveforms spk_ts_sample = ts_samples[spk] spk_samples = np.arange(spk_ts_sample - n_wf_samples, spk_ts_sample + n_wf_samples) # have to reshape to add an axis to broadcast `file_m` into `waveforms` - waveforms[spk, :, :] = file_m[spk_samples[0] : spk_samples[-1] + 1, ch].reshape((spk_samples.size, ch.size)) + waveforms[spk, :, :] = \ + file_m[spk_samples[0]:spk_samples[-1] + 1, ch].reshape((spk_samples.size, ch.size)) print('Done. ({})'.format(time.ctime())) return waveforms @@ -118,19 +119,18 @@ class Streamer(spikeglx.Reader): sr = Streamer(pid=pid, one=one) raw_voltage = sr[int(t0 * sr.fs):int((t0 + nsecs) * sr.fs), :] """ - - def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): +def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.target_dir = None # last chunk directory download or read self.one = one self.pid = pid self.cache_folder = cache_folder or Path(self.one.alyx._par.CACHE_DIR).joinpath('cache', typ) self.remove_cached = remove_cached self.eid, self.pname = self.one.pid2eid(pid) - self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f'*{self.pname}') - sglx_file = self.one.list_datasets(self.eid, f'*.{typ}.cbin', collection=f'*{self.pname}') + self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}") + sglx_file = self.one.list_datasets(self.eid, f'*.{typ}.cbin', collection=f"*{self.pname}") assert len(sglx_file) == 1 sglx_file = sglx_file[0] - meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f'*{self.pname}') + meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}") # check if an uuid is part of meta_file, and if so, add the corresponding uuid to the sglx_file if is_uuid_string(meta_file.name.split('.')[-2]): parts = Path(sglx_file).name.split('.') @@ -138,7 +138,7 @@ def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): pattern = '.'.join(parts) sglx_file = list(meta_file.parent.glob(pattern))[0] sglx_file = self.one.eid2path(self.eid) / sglx_file - cbin_rec = self.one.list_datasets(self.eid, collection=f'*{self.pname}', filename=f'*{typ}.*bin', details=True) + cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True) cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x)) self.url_cbin = self.one.record2url(cbin_rec)[0] with open(self.file_chunks, 'r') as f: @@ -157,9 +157,9 @@ def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True, volts=True): self.cache_folder.mkdir(exist_ok=True, parents=True) sr, file_cbin = self._download_raw_partial(first_chunk=first_chunk, last_chunk=last_chunk) if not volts: - data = np.copy(sr._raw[nsel.start - n0 : nsel.stop - n0, csel]) + data = np.copy(sr._raw[nsel.start - n0:nsel.stop - n0, csel]) else: - data = sr[nsel.start - n0 : nsel.stop - n0, csel] + data = sr[nsel.start - n0: nsel.stop - n0, csel] sr.close() if self.remove_cached: shutil.rmtree(self.target_dir) @@ -178,7 +178,7 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): webclient = self.one.alyx relpath = Path(self.url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0] # write the temp file into a subdirectory - tdir_chunk = f'chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}' + tdir_chunk = f"chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}" # for parallel processes, there is a risk of collisions if the removed cached flag is set to True # if the folder is to be removed append a unique identifier to avoid having duplicate names if self.remove_cached: @@ -201,10 +201,8 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): if ch_file_stream.exists() and ch_file_stream.with_suffix('.cbin').exists(): with open(ch_file_stream, 'r') as f: cmeta_stream = json.load(f) - if ( - cmeta_stream.get('chopped_first_sample', None) == i0 - and cmeta_stream.get('chopped_total_samples', None) == total_samples - ): + if (cmeta_stream.get('chopped_first_sample', None) == i0 and + cmeta_stream.get('chopped_total_samples', None) == total_samples): return spikeglx.Reader(ch_file_stream.with_suffix('.cbin'), ignore_warnings=True), ch_file_stream else: @@ -213,13 +211,13 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): cmeta = self.chunks.copy() # prepare the metadata file - cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk : last_chunk + 2] + cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk:last_chunk + 2] cmeta['chunk_bounds'] = [int(_ - i0) for _ in cmeta['chunk_bounds']] assert len(cmeta['chunk_bounds']) >= 2 assert cmeta['chunk_bounds'][0] == 0 first_byte = cmeta['chunk_offsets'][first_chunk] - cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk : last_chunk + 2] + cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk:last_chunk + 2] cmeta['chunk_offsets'] = [_ - first_byte for _ in cmeta['chunk_offsets']] assert len(cmeta['chunk_offsets']) >= 2 assert cmeta['chunk_offsets'][0] == 0 @@ -242,8 +240,8 @@ def _download_raw_partial(self, first_chunk=0, last_chunk=0): while True: try: cbin_local_path = webclient.download_file( - self.url_cbin, chunks=(first_byte, n_bytes), target_dir=self.target_dir, clobber=True, return_md5=False - ) + self.url_cbin, chunks=(first_byte, n_bytes), + target_dir=self.target_dir, clobber=True, return_md5=False) break except Exception as e: retries += 1 From c3f72697543d2b40773f3612b7ad6e80fa5457f3 Mon Sep 17 00:00:00 2001 From: Georg Raiser Date: Mon, 13 Jan 2025 20:06:42 +0000 Subject: [PATCH 3/3] typo fix --- brainbox/io/spikeglx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/brainbox/io/spikeglx.py b/brainbox/io/spikeglx.py index e2856e36c..199c5d4b8 100644 --- a/brainbox/io/spikeglx.py +++ b/brainbox/io/spikeglx.py @@ -119,7 +119,7 @@ class Streamer(spikeglx.Reader): sr = Streamer(pid=pid, one=one) raw_voltage = sr[int(t0 * sr.fs):int((t0 + nsecs) * sr.fs), :] """ -def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): + def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False): self.target_dir = None # last chunk directory download or read self.one = one self.pid = pid