From 817fd70c0ace1c27a2c5eddd4844551add8e7c24 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 16:29:02 -0600 Subject: [PATCH 01/15] draft --- resources/generate_neuropixels_library.py | 17 +- src/probeinterface/neuropixels_tools.py | 189 +++++++++++----------- 2 files changed, 110 insertions(+), 96 deletions(-) diff --git a/resources/generate_neuropixels_library.py b/resources/generate_neuropixels_library.py index b4055db..7ddb7b9 100644 --- a/resources/generate_neuropixels_library.py +++ b/resources/generate_neuropixels_library.py @@ -6,7 +6,10 @@ import numpy as np import matplotlib.pyplot as plt -from probeinterface.neuropixels_tools import _make_npx_probe_from_description, get_probe_metadata_from_probe_features +from probeinterface.neuropixels_tools import ( + _make_npx_probe_from_description, + _extract_probe_geometry, +) from probeinterface.plotting import plot_probe from probeinterface import write_probeinterface @@ -41,10 +44,12 @@ def generate_all_npx(base_folder=None): probe_folder = base_folder / model_name probe_folder.mkdir(exist_ok=True) - pt_metadata, _, _ = get_probe_metadata_from_probe_features(probe_features, model_name) + probe_spec_dict = probe_features["neuropixels_probes"][model_name] + probe_geometry = _extract_probe_geometry(probe_spec_dict) + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - num_shank = pt_metadata["num_shanks"] - contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] + num_shank = probe_geometry["num_shanks"] + contact_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] if num_shank == 1: elec_ids = np.arange(contact_per_shank) shank_ids = None @@ -52,7 +57,7 @@ def generate_all_npx(base_folder=None): elec_ids = np.concatenate([np.arange(contact_per_shank) for i in range(num_shank)]) shank_ids = np.concatenate([np.zeros(contact_per_shank) + i for i in range(num_shank)]) - probe = _make_npx_probe_from_description(pt_metadata, model_name, elec_ids, shank_ids) + probe = _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank_ids, mux_table_string) # plotting fig, axs = plt.subplots(ncols=2) @@ -73,7 +78,7 @@ def generate_all_npx(base_folder=None): plot_probe(probe, ax=ax) ax.set_title("") - yp = pt_metadata["electrode_pitch_vert_um"] + yp = probe_geometry["electrode_pitch_vert_um"] ax.set_ylim(-yp*8, yp*13) ax.yaxis.set_visible(False) ax.spines["top"].set_visible(False) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 23d6531..82ab9ba 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -24,7 +24,7 @@ def _load_np_probe_features(): - # this avoid loading the json several time + # this avoid loading the json several times global _np_probe_features if _np_probe_features is None: probe_features_filepath = Path(__file__).absolute().parent / Path("resources/neuropixels_probe_features.json") @@ -243,27 +243,27 @@ def read_imro(file_path: Union[str, Path]) -> Probe: return _read_imro_string(imro_str, imDatPrb_pn) -def _make_npx_probe_from_description(probe_description, model_name, elec_ids, shank_ids, mux_info=None) -> Probe: +def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank_ids, mux_table_string=None) -> Probe: # used by _read_imro_string and for generating the NP library # compute position - y_idx, x_idx = np.divmod(elec_ids, probe_description["cols_per_shank"]) - x_pitch = probe_description["electrode_pitch_horz_um"] - y_pitch = probe_description["electrode_pitch_vert_um"] + y_idx, x_idx = np.divmod(elec_ids, probe_geometry["cols_per_shank"]) + x_pitch = probe_geometry["electrode_pitch_horz_um"] + y_pitch = probe_geometry["electrode_pitch_vert_um"] raw_stagger = ( - probe_description["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - - probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + probe_geometry["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + - probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] ) stagger = np.mod(y_idx + 1, 2) * raw_stagger x_pos = (x_idx * x_pitch + stagger).astype("float64") y_pos = (y_idx * y_pitch).astype("float64") - # if probe_description["shank_number"] > 1: + # if probe_geometry["shank_number"] > 1: if shank_ids is not None: # shank_ids = np.array(contact_info["shank_id"]) - shank_pitch = probe_description["shank_pitch_um"] + shank_pitch = probe_geometry["shank_pitch_um"] contact_ids = [f"s{shank_id}e{elec_id}" for shank_id, elec_id in zip(shank_ids, elec_ids)] x_pos += np.array(shank_ids).astype(int) * shank_pitch else: @@ -274,12 +274,12 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh # construct Probe object probe = Probe(ndim=2, si_units="um", model_name=model_name, manufacturer="imec") - probe.description = probe_description["description"] + probe.description = probe_geometry["description"] probe.set_contacts( positions=positions, shapes="square", shank_ids=shank_ids, - shape_params={"width": probe_description["electrode_size_horz_direction_um"]}, + shape_params={"width": probe_geometry["electrode_size_horz_direction_um"]}, ) probe.set_contact_ids(contact_ids) @@ -287,13 +287,13 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh # Add planar contour polygon = np.array( get_probe_contour_vertices( - probe_description["shank_width_um"], probe_description["tip_length_um"], get_probe_length(model_name) + probe_geometry["shank_width_um"], probe_geometry["tip_length_um"], get_probe_length(model_name) ) ) contour = [] - shank_pitch = probe_description["shank_pitch_um"] - for shank_id in range(probe_description["num_shanks"]): + shank_pitch = probe_geometry["shank_pitch_um"] + for shank_id in range(probe_geometry["num_shanks"]): shank_shift = np.array([shank_pitch * shank_id, 0]) contour += list(polygon + shank_shift) @@ -301,7 +301,7 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh middle_of_bottommost_electrode_to_top_of_shank_tip = 11 contour_shift = np.array( [ - -probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"], + -probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"], -middle_of_bottommost_electrode_to_top_of_shank_tip, ] ) @@ -310,7 +310,7 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh # shank tips : minimum of the polygon shank_tips = [] - for shank_id in range(probe_description["num_shanks"]): + for shank_id in range(probe_geometry["num_shanks"]): shank_shift = np.array([shank_pitch * shank_id, 0]) shank_tip = np.array(polygon[2]) + contour_shift + shank_shift shank_tips.append(shank_tip.tolist()) @@ -322,16 +322,16 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh # set other key metadata annotations probe.annotate( - adc_bit_depth=int(probe_description["adc_bit_depth"]), - num_readout_channels=int(probe_description["num_readout_channels"]), - ap_sample_frequency_hz=float(probe_description["ap_sample_frequency_hz"]), - lf_sample_frequency_hz=float(probe_description["lf_sample_frequency_hz"]), + adc_bit_depth=int(probe_geometry["adc_bit_depth"]), + num_readout_channels=int(probe_geometry["num_readout_channels"]), + ap_sample_frequency_hz=float(probe_geometry["ap_sample_frequency_hz"]), + lf_sample_frequency_hz=float(probe_geometry["lf_sample_frequency_hz"]), ) # annotate with MUX table - if mux_info is not None: + if mux_table_string is not None: # annotate each contact with its mux channel - num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_info) + num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) num_contacts = positions.shape[0] # ADC group: which adc is used for each contact adc_groups = np.zeros(num_contacts, dtype="int64") @@ -377,14 +377,17 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe probe_type = probe_type_num_chans.split(",")[0][1:] probe_features = _load_np_probe_features() - pt_metadata, fields, mux_info = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) + probe_spec_dict = probe_features["neuropixels_probes"][imDatPrb_pn] + probe_geometry = _extract_probe_geometry(probe_spec_dict) + imro_field_names = _get_imro_field_names(probe_features, probe_spec_dict) + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - # fields = probe_description["fields_in_imro_table"] - contact_info = {k: [] for k in fields} + # Parse IMRO table entries using field names + contact_info = {k: [] for k in imro_field_names} for field_values_str in imro_table_values_list: # Imro table values look like '(value, value, value, ... ' # Split them by space to get int('value'), int('value'), int('value'), ...) values = tuple(map(int, field_values_str[1:].split(" "))) - for field, field_value in zip(fields, values): + for field, field_value in zip(imro_field_names, values): contact_info[field].append(field_value) channel_ids = np.array(contact_info["channel"]) @@ -398,12 +401,12 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe banks = np.array(contact_info[bank_key]) elec_ids = banks * 384 + channel_ids - if pt_metadata["num_shanks"] > 1: + if probe_geometry["num_shanks"] > 1: shank_ids = np.array(contact_info["shank"]) else: shank_ids = None - probe = _make_npx_probe_from_description(pt_metadata, imDatPrb_pn, elec_ids, shank_ids, mux_info) + probe = _make_npx_probe_from_description(probe_geometry, imDatPrb_pn, elec_ids, shank_ids, mux_table_string) # scalar annotations probe.annotate( @@ -424,64 +427,68 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe return probe -def get_probe_metadata_from_probe_features(probe_features: dict, imDatPrb_pn: str): +def _extract_probe_geometry(probe_spec_dict: dict) -> dict: """ - Parses the `probe_features` dict, to cast string to appropriate types - and parses the imro_table_fields string. Returns the metadata needed - to construct a probe with part number `imDatPrb_pn`. + Extract probe geometry from probe specification, converting string values to appropriate numeric types. Parameters ---------- - probe_features : dict - Dictionary obtained when reading in the `neuropixels_probe_features.json` file. - imDatPrb_pn : str - Probe part number. + probe_spec_dict : dict + Raw probe specification dictionary from probe_features["neuropixels_probes"][probe_name] Returns ------- - probe_metadata, imro_field, mux_information - Dictionary of probe metadata. - Tuple of fields included in the `imro_table_fields`. - Mux table information, if available, as a string. + dict + A new dictionary with numeric fields converted to int or float """ + int_geometry_keys = [ + "num_shanks", "cols_per_shank", "rows_per_shank", "adc_bit_depth", "num_readout_channels" + ] + float_geometry_keys = [ + "electrode_pitch_horz_um", + "electrode_pitch_vert_um", + "electrode_size_horz_direction_um", + "shank_pitch_um", + "shank_width_um", + "tip_length_um", + "even_row_horz_offset_left_edge_to_leftmost_electrode_center_um", + "odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um", + ] - probe_metadata = probe_features["neuropixels_probes"].get(imDatPrb_pn) - for key in probe_metadata.keys(): - if key in ["num_shanks", "cols_per_shank", "rows_per_shank", "adc_bit_depth", "num_readout_channels"]: - probe_metadata[key] = int(probe_metadata[key]) - elif key in [ - "electrode_pitch_horz_um", - "electrode_pitch_vert_um", - "electrode_size_horz_direction_um", - "shank_pitch_um", - "shank_width_um", - "tip_length_um", - "even_row_horz_offset_left_edge_to_leftmost_electrode_center_um", - "odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um", - ]: - probe_metadata[key] = float(probe_metadata[key]) - - # Read the imro table formats to find out which fields the imro tables contain - imro_table_format_type = probe_metadata["imro_table_format_type"] - imro_table_fields = probe_features["z_imro_formats"][imro_table_format_type + "_elm_flds"] + probe_geometry = {} + for key, value in probe_spec_dict.items(): + if key in int_geometry_keys: + probe_geometry[key] = int(value) + elif key in float_geometry_keys: + probe_geometry[key] = float(value) + else: + probe_geometry[key] = value - # parse the imro_table_fields, which look like (value value value ...) - list_of_imro_fields = imro_table_fields.replace("(", "").replace(")", "").split(" ") + return probe_geometry - imro_fields_list = [] - for imro_field in list_of_imro_fields: - imro_fields_list.append(imro_field) - imro_fields = tuple(imro_fields_list) +def _get_imro_field_names(probe_features: dict, probe_spec_dict: dict) -> tuple: + """ + Get IMRO table field names for parsing IMRO table entries. - # Read MUX table information - mux_information = None + Parameters + ---------- + probe_features : dict + Full probe features dictionary from neuropixels_probe_features.json + probe_spec_dict : dict + Probe specification dictionary (must contain 'imro_table_format_type') - if "z_mux_tables" in probe_features: - mux_table_format_type = probe_metadata.get("mux_table_format_type", None) - mux_information = probe_features["z_mux_tables"].get(mux_table_format_type, None) + Returns + ------- + tuple + Field names for IMRO table parsing, e.g., ('channel', 'bank', 'ref_id', 'ap_gain', ...) + """ + imro_table_format_type = probe_spec_dict["imro_table_format_type"] + imro_table_fields = probe_features["z_imro_formats"][imro_table_format_type + "_elm_flds"] - return probe_metadata, imro_fields, mux_information + # Parse the field string, which looks like "(channel bank ref_id ...)" + fields_str = imro_table_fields.replace("(", "").replace(")", "") + return tuple(fields_str.split(" ")) def write_imro(file: str | Path, probe: Probe): @@ -892,13 +899,15 @@ def read_openephys( selected_electrodes = np_probe.find("SELECTED_ELECTRODES") channels = np_probe.find("CHANNELS") - pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + probe_geometry = _extract_probe_geometry(probe_spec_dict) + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] if selected_electrodes is not None: selected_electrodes_values = selected_electrodes.attrib.values() - num_shank = pt_metadata["num_shanks"] - contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] + num_shank = probe_geometry["num_shanks"] + contact_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] if num_shank == 1: elec_ids = np.arange(contact_per_shank, dtype=int) @@ -908,7 +917,7 @@ def read_openephys( shank_ids = np.concatenate([np.zeros(contact_per_shank, dtype=int) + i for i in range(num_shank)]) full_probe = _make_npx_probe_from_description( - pt_metadata, probe_part_number, elec_ids, shank_ids, mux_info=mux_info + probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string=mux_table_string ) selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] @@ -916,10 +925,10 @@ def read_openephys( sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) np_probe_dict = { - "pt_metadata": pt_metadata, + "probe_geometry": probe_geometry, "serial_number": probe_serial_number, "part_number": probe_part_number, - "mux_info": mux_info, + "mux_table_string": mux_table_string, "probe": sliced_probe, } else: @@ -951,7 +960,7 @@ def read_openephys( ypos = np.array([float(electrode_ypos.attrib[ch]) for ch in channel_names]) positions = np.array([xpos, ypos]).T - shank_pitch = pt_metadata["shank_pitch_um"] + shank_pitch = probe_geometry["shank_pitch_um"] if fix_x_position_for_oe_5 and oe_version < parse("0.6.0") and shank_ids is not None: positions[:, 1] = positions[:, 1] - shank_pitch * shank_ids @@ -964,20 +973,20 @@ def read_openephys( positions[:, 0] -= offset # - y_pitch = pt_metadata[ + y_pitch = probe_geometry[ "electrode_pitch_vert_um" ] # Vertical spacing between the centers of adjacent contacts - x_pitch = pt_metadata[ + x_pitch = probe_geometry[ "electrode_pitch_horz_um" ] # Horizontal spacing between the centers of contacts within the same row - number_of_columns = pt_metadata["cols_per_shank"] + number_of_columns = probe_geometry["cols_per_shank"] probe_stagger = ( - pt_metadata["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - - pt_metadata["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + probe_geometry["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + - probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] ) - num_shanks = pt_metadata["num_shanks"] + num_shanks = probe_geometry["num_shanks"] - description = pt_metadata.get("description") + description = probe_geometry.get("description") elec_ids = [] for i, pos in enumerate(positions): @@ -1009,13 +1018,13 @@ def read_openephys( np_probe_dict = { "shank_ids": shank_ids, "elec_ids": elec_ids, - "pt_metadata": pt_metadata, + "probe_geometry": probe_geometry, "slot": slot, "port": port, "dock": dock, "serial_number": probe_serial_number, "part_number": probe_part_number, - "mux_info": mux_info, + "mux_table_string": mux_table_string, } # Sequentially assign probe names @@ -1124,11 +1133,11 @@ def read_openephys( # check if subset of channels shank_ids = np_probe_info["shank_ids"] elec_ids = np_probe_info["elec_ids"] - pt_metadata = np_probe_info["pt_metadata"] - mux_info = np_probe_info["mux_info"] + probe_geometry = np_probe_info["probe_geometry"] + mux_table_string = np_probe_info["mux_table_string"] probe = _make_npx_probe_from_description( - pt_metadata, probe_part_number, elec_ids, shank_ids=shank_ids, mux_info=mux_info + probe_geometry, probe_part_number, elec_ids, shank_ids=shank_ids, mux_table_string=mux_table_string ) chans_saved = get_saved_channel_indices_from_openephys_settings(settings_file, stream_name=stream_name) From c00bbd03cef958b1b25bab356034286e4d497af9 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 16:54:21 -0600 Subject: [PATCH 02/15] separate build from slicing --- resources/generate_neuropixels_library.py | 28 +-- src/probeinterface/neuropixels_tools.py | 236 +++++++++++++++++----- 2 files changed, 191 insertions(+), 73 deletions(-) diff --git a/resources/generate_neuropixels_library.py b/resources/generate_neuropixels_library.py index 7ddb7b9..5357965 100644 --- a/resources/generate_neuropixels_library.py +++ b/resources/generate_neuropixels_library.py @@ -6,10 +6,7 @@ import numpy as np import matplotlib.pyplot as plt -from probeinterface.neuropixels_tools import ( - _make_npx_probe_from_description, - _extract_probe_geometry, -) +from probeinterface.neuropixels_tools import build_neuropixels_probe from probeinterface.plotting import plot_probe from probeinterface import write_probeinterface @@ -31,6 +28,8 @@ def generate_all_npx(base_folder=None): probe_part_numbers = probe_features['neuropixels_probes'].keys() + # Note: model_name here is the probe_part_number (specific SKU like "NP1000"), + # NOT the model/platform (like "Neuropixels 1.0", "Neuropixels 2.0", "Ultra", or "NHP") for model_name in probe_part_numbers: print(model_name) @@ -44,20 +43,8 @@ def generate_all_npx(base_folder=None): probe_folder = base_folder / model_name probe_folder.mkdir(exist_ok=True) - probe_spec_dict = probe_features["neuropixels_probes"][model_name] - probe_geometry = _extract_probe_geometry(probe_spec_dict) - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - - num_shank = probe_geometry["num_shanks"] - contact_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] - if num_shank == 1: - elec_ids = np.arange(contact_per_shank) - shank_ids = None - else: - elec_ids = np.concatenate([np.arange(contact_per_shank) for i in range(num_shank)]) - shank_ids = np.concatenate([np.zeros(contact_per_shank) + i for i in range(num_shank)]) - - probe = _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank_ids, mux_table_string) + # Build the full probe with all possible contacts from the probe part number + probe = build_neuropixels_probe(model_name) # plotting fig, axs = plt.subplots(ncols=2) @@ -78,7 +65,10 @@ def generate_all_npx(base_folder=None): plot_probe(probe, ax=ax) ax.set_title("") - yp = probe_geometry["electrode_pitch_vert_um"] + # Get vertical pitch from contact positions (minimum non-zero y-distance) + positions = probe.contact_positions + y_positions = np.unique(positions[:, 1]) + yp = np.min(np.diff(y_positions)) ax.set_ylim(-yp*8, yp*13) ax.yaxis.set_visible(False) ax.spines["top"].set_visible(False) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 82ab9ba..31ebb27 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -243,7 +243,7 @@ def read_imro(file_path: Union[str, Path]) -> Probe: return _read_imro_string(imro_str, imDatPrb_pn) -def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank_ids, mux_table_string=None) -> Probe: +def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string=None) -> Probe: # used by _read_imro_string and for generating the NP library # compute position @@ -273,7 +273,7 @@ def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank positions = np.stack((x_pos, y_pos), axis=1) # construct Probe object - probe = Probe(ndim=2, si_units="um", model_name=model_name, manufacturer="imec") + probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") probe.description = probe_geometry["description"] probe.set_contacts( positions=positions, @@ -287,7 +287,7 @@ def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank # Add planar contour polygon = np.array( get_probe_contour_vertices( - probe_geometry["shank_width_um"], probe_geometry["tip_length_um"], get_probe_length(model_name) + probe_geometry["shank_width_um"], probe_geometry["tip_length_um"], get_probe_length(probe_part_number) ) ) @@ -317,9 +317,6 @@ def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank probe.annotate(shank_tips=shank_tips) - # wire it - probe.set_device_channel_indices(np.arange(positions.shape[0])) - # set other key metadata annotations probe.annotate( adc_bit_depth=int(probe_geometry["adc_bit_depth"]), @@ -349,38 +346,115 @@ def _make_npx_probe_from_description(probe_geometry, model_name, elec_ids, shank return probe -def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: +def build_neuropixels_probe(probe_part_number: str) -> Probe: """ - Parse the IMRO table when presented as a string and create a Probe object. + Build a Neuropixels probe with all possible contacts from the probe part number. + + This function constructs a complete probe geometry including all physical contacts, + MUX table information, and probe-level metadata. The resulting probe contains ALL + electrodes (e.g., 960 for NP1.0, 1280 for NP2.0), not just the subset that might + be recorded in an actual experiment. + + Parameters + ---------- + probe_part_number : str + Probe part number (specific SKU identifier). + Examples: "NP1000", "NP2000", "NP1010", "NP2003", "NP2004" + + Note: This is the specific SKU, not the model/platform name: + - probe_part_number is like "NP1000" (specific SKU) + - NOT like "Neuropixels 1.0" or "Neuropixels 2.0" (platform family) + + In SpikeGLX meta files, this corresponds to the `imDatPrb_pn` field. + Multiple part numbers may belong to the same platform family but have + different configurations or variants. + + Returns + ------- + probe : Probe + Full probe object with all possible contacts, including: + - Contact positions (x, y coordinates) + - Contact IDs (e.g., "e0", "e1", ..., "e959") + - Planar contour (probe outline) + - Shank tips (for multi-shank probes) + - Device channel indices + - MUX table annotations (ADC groups, sample order) + - Probe-level metadata (sample rates, ADC bit depth, etc.) + """ + # Load probe features from JSON + probe_features = _load_np_probe_features() + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + + # Extract and convert geometry + probe_geometry = _extract_probe_geometry(probe_spec_dict) + + # Get MUX table + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] + + # Calculate ALL electrode IDs (all possible contacts) + num_shanks = probe_geometry["num_shanks"] + contacts_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] + + if num_shanks == 1: + elec_ids = np.arange(contacts_per_shank, dtype=int) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for i in range(num_shanks)]) + shank_ids = np.concatenate([np.zeros(contacts_per_shank, dtype=int) + i for i in range(num_shanks)]) + + # Build the full probe + probe = _make_npx_probe_from_description( + probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string + ) + + return probe + + +def _parse_imro_table(imro_str: str, probe_part_number: str) -> tuple[str, np.ndarray, dict]: + """ + Parse the IMRO table to extract recorded electrode IDs and configuration settings. + + This function parses the IMRO (IMEC Readout) table from SpikeGLX meta files to determine + which electrodes were actually recorded and their configuration (gains, filters, references). + It does NOT build a probe object - use build_neuropixels_probe() for that. Parameters ---------- imro_str : str - IMRO table as a string. - imDatPrb_pn : str, optional - Probe number, by default None. + IMRO table as a string from SpikeGLX meta file. + probe_part_number : str + Probe part number (e.g., "NP1000", "NP2000"). This is the `imDatPrb_pn` field + from the SpikeGLX meta file. Returns ------- - Probe - A Probe object built from the parsed IMRO table data. + probe_type : str + Numeric probe type from IMRO header (e.g., "0" for NP1.0, "21" for NP2.0) + elec_ids : np.ndarray + Array of electrode IDs that were recorded (e.g., 384 values for NP1.0) + contact_annotations : dict + Dictionary of per-contact annotations including: + - channel_ids: recorded channel indices + - banks: bank selection per channel (NP1.0) + - references: reference electrode selection + - ap_gains: AP band gain settings + - lf_gains: LF band gain settings + - ap_hp_filters: AP high-pass filter on/off See Also -------- + build_neuropixels_probe : Build complete probe geometry https://billkarsh.github.io/SpikeGLX/help/imroTables/ - """ - probe_type_num_chans, *imro_table_values_list, _ = imro_str.strip().split(")") # probe_type_num_chans looks like f"({probe_type},{num_chans}" probe_type = probe_type_num_chans.split(",")[0][1:] + # Load probe features to get IMRO field names probe_features = _load_np_probe_features() - probe_spec_dict = probe_features["neuropixels_probes"][imDatPrb_pn] - probe_geometry = _extract_probe_geometry(probe_spec_dict) + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] imro_field_names = _get_imro_field_names(probe_features, probe_spec_dict) - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] # Parse IMRO table entries using field names contact_info = {k: [] for k in imro_field_names} @@ -390,10 +464,13 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe for field, field_value in zip(imro_field_names, values): contact_info[field].append(field_value) + # Extract electrode IDs from the IMRO table channel_ids = np.array(contact_info["channel"]) if "electrode" in contact_info: + # NP2.0+: electrode IDs are explicit in IMRO table elec_ids = np.array(contact_info["electrode"]) else: + # NP1.0: calculate electrode IDs from bank selection if contact_info.get("bank") is not None: bank_key = "bank" elif contact_info.get("bank_mask") is not None: @@ -401,28 +478,65 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe banks = np.array(contact_info[bank_key]) elec_ids = banks * 384 + channel_ids - if probe_geometry["num_shanks"] > 1: - shank_ids = np.array(contact_info["shank"]) - else: - shank_ids = None + # Convert IMRO field names to ProbeInterface naming for backwards compatibility + vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + contact_annotations = {} + for k, v in contact_info.items(): + if (k in vector_properties) and (len(v) > 0): + contact_annotations[imro_field_to_pi_field.get(k)] = v - probe = _make_npx_probe_from_description(probe_geometry, imDatPrb_pn, elec_ids, shank_ids, mux_table_string) + # Add probe_type as a scalar annotation + contact_annotations["probe_type"] = probe_type - # scalar annotations - probe.annotate( - probe_type=probe_type, - ) + return probe_type, elec_ids, contact_annotations - # vector annotations - vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - vector_properties_available = {} - for k, v in contact_info.items(): - if (k in vector_properties) and (len(v) > 0): - # convert to ProbeInterface naming for backwards compatibility - vector_properties_available[imro_field_to_pi_field.get(k)] = v +def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: + """ + Parse the IMRO table when presented as a string and create a Probe object. + + DEPRECATED: This function is kept for backwards compatibility but will be removed + in a future version. Use build_neuropixels_probe() + _parse_imro_table() instead. + + Parameters + ---------- + imro_str : str + IMRO table as a string. + imDatPrb_pn : str, optional + Probe number, by default None. + + Returns + ------- + Probe + A Probe object built from the parsed IMRO table data. + + See Also + -------- + build_neuropixels_probe : Build complete probe geometry + _parse_imro_table : Parse IMRO table configuration + https://billkarsh.github.io/SpikeGLX/help/imroTables/ + """ + # Parse IMRO table + probe_type, elec_ids, contact_annotations = _parse_imro_table(imro_str, imDatPrb_pn) - probe.annotate_contacts(**vector_properties_available) + # Build full probe + probe = build_neuropixels_probe(imDatPrb_pn) + + # Slice the full probe to only include the recorded electrodes from the IMRO table. + # For example, this reduces 960 total contacts down to the 384 actually recorded. + keep_indices = [] + for elec_id in elec_ids: + matching_indices = np.where(probe.contact_ids == f"e{elec_id}")[0] + if len(matching_indices) > 0: + keep_indices.append(matching_indices[0]) + + probe = probe.get_slice(keep_indices) + + # Add probe type annotation + probe.annotate(probe_type=contact_annotations.pop("probe_type")) + + # Add contact annotations from IMRO table + probe.annotate_contacts(**contact_annotations) return probe @@ -577,29 +691,43 @@ def read_spikeglx(file: str | Path) -> Probe: assert "imroTbl" in meta, "Could not find imroTbl field in meta file!" imro_table = meta["imroTbl"] - # read serial number - imDatPrb_serial_number = meta.get("imDatPrb_sn", None) - if imDatPrb_serial_number is None: # this is for Phase3A - imDatPrb_serial_number = meta.get("imProbeSN", None) + # Extract probe part number + probe_part_number = meta.get("imDatPrb_pn", None) + # Only Phase3a probe has "imProbeOpt". Map this to NP1010 + if meta.get("imProbeOpt") is not None: + probe_part_number = "NP1010" + + # Build the FULL probe with all possible contacts from the probe part number + probe = build_neuropixels_probe(probe_part_number) - # read other metadata - imDatPrb_pn = meta.get("imDatPrb_pn", None) - imDatPrb_port = meta.get("imDatPrb_port", None) - imDatPrb_slot = meta.get("imDatPrb_slot", None) - imDatPrb_part_number = meta.get("imDatPrb_pn", None) + # Parse IMRO table to get recorded electrode IDs and configuration + probe_type, elec_ids, contact_annotations = _parse_imro_table(imro_table, probe_part_number) - # Only Phase3a probe has "imProbeOpt". Map this to NP10101. - if meta.get("imProbeOpt") is not None: - imDatPrb_pn = "NP1010" + # Slice the full probe to only include the recorded electrodes from the IMRO table. + # For example, this reduces 960 total contacts down to the 384 actually recorded. + keep_indices = [] + for elec_id in elec_ids: + matching_indices = np.where(probe.contact_ids == f"e{elec_id}")[0] + if len(matching_indices) > 0: + keep_indices.append(matching_indices[0]) + probe = probe.get_slice(keep_indices) + + # Add probe type from IMRO header + probe.annotate(probe_type=contact_annotations.pop("probe_type")) - probe = _read_imro_string(imro_str=imro_table, imDatPrb_pn=imDatPrb_pn) + # Add contact annotations from IMRO table (gains, filters, references, banks) + probe.annotate_contacts(**contact_annotations) - # add serial number and other annotations - probe.annotate(serial_number=imDatPrb_serial_number) - probe.annotate(part_number=imDatPrb_part_number) - probe.annotate(port=imDatPrb_port) - probe.annotate(slot=imDatPrb_slot) - probe.annotate(serial_number=imDatPrb_serial_number) + # Add probe-level metadata from meta file + serial_number = meta.get("imDatPrb_sn", None) + if serial_number is None: # Phase3A uses different field name + serial_number = meta.get("imProbeSN", None) + probe.annotate( + serial_number=serial_number, + part_number=probe_part_number, + port=meta.get("imDatPrb_port", None), + slot=meta.get("imDatPrb_slot", None), + ) # sometimes we need to slice the probe when not all channels are saved saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) From d90c11061651c8097fec62776098290684b309f0 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 20:13:56 -0600 Subject: [PATCH 03/15] make it simpler --- src/probeinterface/neuropixels_tools.py | 147 ++++++++++++++++-------- 1 file changed, 102 insertions(+), 45 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 31ebb27..3735810 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -372,28 +372,15 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: Returns ------- probe : Probe - Full probe object with all possible contacts, including: - - Contact positions (x, y coordinates) - - Contact IDs (e.g., "e0", "e1", ..., "e959") - - Planar contour (probe outline) - - Shank tips (for multi-shank probes) - - Device channel indices - - MUX table annotations (ADC groups, sample order) - - Probe-level metadata (sample rates, ADC bit depth, etc.) + The complete Probe object with all contacts and metadata. """ - # Load probe features from JSON + # ===== 1. Load configuration ===== probe_features = _load_np_probe_features() probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] - # Extract and convert geometry - probe_geometry = _extract_probe_geometry(probe_spec_dict) - - # Get MUX table - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - - # Calculate ALL electrode IDs (all possible contacts) - num_shanks = probe_geometry["num_shanks"] - contacts_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] + # ===== 2. Calculate electrode IDs and shank IDs ===== + num_shanks = int(probe_spec_dict["num_shanks"]) + contacts_per_shank = int(probe_spec_dict["cols_per_shank"]) * int(probe_spec_dict["rows_per_shank"]) if num_shanks == 1: elec_ids = np.arange(contacts_per_shank, dtype=int) @@ -402,10 +389,95 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for i in range(num_shanks)]) shank_ids = np.concatenate([np.zeros(contacts_per_shank, dtype=int) + i for i in range(num_shanks)]) - # Build the full probe - probe = _make_npx_probe_from_description( - probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string + # ===== 3. Calculate contact positions ===== + cols_per_shank = int(probe_spec_dict["cols_per_shank"]) + y_idx, x_idx = np.divmod(elec_ids, cols_per_shank) + + x_pitch = float(probe_spec_dict["electrode_pitch_horz_um"]) + y_pitch = float(probe_spec_dict["electrode_pitch_vert_um"]) + + even_offset = float(probe_spec_dict["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + odd_offset = float(probe_spec_dict["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + raw_stagger = even_offset - odd_offset + + stagger = np.mod(y_idx + 1, 2) * raw_stagger + x_pos = (x_idx * x_pitch + stagger).astype("float64") + y_pos = (y_idx * y_pitch).astype("float64") + + # Apply horizontal offset for multi-shank probes + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + x_pos += np.array(shank_ids).astype(int) * shank_pitch + + positions = np.stack((x_pos, y_pos), axis=1) + + # ===== 4. Calculate contact IDs ===== + if shank_ids is not None: + contact_ids = [f"s{shank_id}e{elec_id}" for shank_id, elec_id in zip(shank_ids, elec_ids)] + else: + contact_ids = [f"e{elec_id}" for elec_id in elec_ids] + + # ===== 5. Create Probe object and set contacts ===== + probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") + probe.description = probe_spec_dict["description"] + probe.set_contacts( + positions=positions, + shapes="square", + shank_ids=shank_ids, + shape_params={"width": float(probe_spec_dict["electrode_size_horz_direction_um"])}, ) + probe.set_contact_ids(contact_ids) + + # ===== 6. Build probe contour and shank tips ===== + shank_width = float(probe_spec_dict["shank_width_um"]) + tip_length = float(probe_spec_dict["tip_length_um"]) + polygon = np.array(get_probe_contour_vertices(shank_width, tip_length, get_probe_length(probe_part_number))) + + # Build contour for all shanks + contour = [] + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + contour += list(polygon + shank_shift) + + # Apply contour shift to align with contact positions + middle_of_bottommost_electrode_to_top_of_shank_tip = 11 + contour_shift = np.array([-odd_offset, -middle_of_bottommost_electrode_to_top_of_shank_tip]) + contour = np.array(contour) + contour_shift + probe.set_planar_contour(contour) + + # Calculate shank tips (polygon[2] is the tip vertex from get_probe_contour_vertices) + tip_vertex = polygon[2] + shank_tips = [] + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + shank_tip = np.array(tip_vertex) + contour_shift + shank_shift + shank_tips.append(shank_tip.tolist()) + probe.annotate(shank_tips=shank_tips) + + # ===== 7. Add metadata annotations ===== + probe.annotate( + adc_bit_depth=int(probe_spec_dict["adc_bit_depth"]), + num_readout_channels=int(probe_spec_dict["num_readout_channels"]), + ap_sample_frequency_hz=float(probe_spec_dict["ap_sample_frequency_hz"]), + lf_sample_frequency_hz=float(probe_spec_dict["lf_sample_frequency_hz"]), + ) + + # ===== 8. Add MUX table annotations ===== + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] + num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) + num_contacts = positions.shape[0] + adc_groups = np.zeros(num_contacts, dtype="int64") + adc_sample_order = np.zeros(num_contacts, dtype="int64") + for adc_index, adc_groups_per_adc in enumerate(mux_table): + adc_groups_per_adc = adc_groups_per_adc[adc_groups_per_adc < num_contacts] + adc_groups[adc_groups_per_adc] = adc_index + adc_sample_order[adc_groups_per_adc] = np.arange(len(adc_groups_per_adc)) + probe.annotate(num_adcs=num_adcs) + probe.annotate(num_channels_per_adc=num_channels_per_adc) + probe.annotate_contacts(adc_group=adc_groups) + probe.annotate_contacts(adc_sample_order=adc_sample_order) return probe @@ -530,7 +602,7 @@ def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe if len(matching_indices) > 0: keep_indices.append(matching_indices[0]) - probe = probe.get_slice(keep_indices) + probe = probe.get_slice(np.array(keep_indices, dtype=int)) # Add probe type annotation probe.annotate(probe_type=contact_annotations.pop("probe_type")) @@ -1027,39 +1099,24 @@ def read_openephys( selected_electrodes = np_probe.find("SELECTED_ELECTRODES") channels = np_probe.find("CHANNELS") - probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] - probe_geometry = _extract_probe_geometry(probe_spec_dict) - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - if selected_electrodes is not None: - selected_electrodes_values = selected_electrodes.attrib.values() - - num_shank = probe_geometry["num_shanks"] - contact_per_shank = probe_geometry["cols_per_shank"] * probe_geometry["rows_per_shank"] - - if num_shank == 1: - elec_ids = np.arange(contact_per_shank, dtype=int) - shank_ids = None - else: - elec_ids = np.concatenate([np.arange(contact_per_shank, dtype=int) for i in range(num_shank)]) - shank_ids = np.concatenate([np.zeros(contact_per_shank, dtype=int) + i for i in range(num_shank)]) - - full_probe = _make_npx_probe_from_description( - probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string=mux_table_string - ) - - selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] + # Build the FULL probe with all possible contacts from the probe part number + full_probe = build_neuropixels_probe(probe_part_number) + # Slice the full probe to only include the selected electrodes from OpenEphys settings + selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes.attrib.values()] sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) np_probe_dict = { - "probe_geometry": probe_geometry, "serial_number": probe_serial_number, "part_number": probe_part_number, - "mux_table_string": mux_table_string, "probe": sliced_probe, } else: + # For older OpenEphys versions without SELECTED_ELECTRODES, we need probe_geometry for position calculations + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + probe_geometry = _extract_probe_geometry(probe_spec_dict) + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] channel_names = np.array(list(channels.attrib.keys())) channel_ids = np.array([int(ch[2:]) for ch in channel_names]) From 4412a258c07540b1dbfe4da88b56fa01cb1a8f99 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 20:29:09 -0600 Subject: [PATCH 04/15] add build_neuorpixels_probe method --- src/probeinterface/neuropixels_tools.py | 438 ++++++++++++------------ 1 file changed, 222 insertions(+), 216 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 3735810..c4dea25 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -243,27 +243,27 @@ def read_imro(file_path: Union[str, Path]) -> Probe: return _read_imro_string(imro_str, imDatPrb_pn) -def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids, shank_ids, mux_table_string=None) -> Probe: +def _make_npx_probe_from_description(probe_description, model_name, elec_ids, shank_ids, mux_info=None) -> Probe: # used by _read_imro_string and for generating the NP library # compute position - y_idx, x_idx = np.divmod(elec_ids, probe_geometry["cols_per_shank"]) - x_pitch = probe_geometry["electrode_pitch_horz_um"] - y_pitch = probe_geometry["electrode_pitch_vert_um"] + y_idx, x_idx = np.divmod(elec_ids, probe_description["cols_per_shank"]) + x_pitch = probe_description["electrode_pitch_horz_um"] + y_pitch = probe_description["electrode_pitch_vert_um"] raw_stagger = ( - probe_geometry["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - - probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + probe_description["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + - probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] ) stagger = np.mod(y_idx + 1, 2) * raw_stagger x_pos = (x_idx * x_pitch + stagger).astype("float64") y_pos = (y_idx * y_pitch).astype("float64") - # if probe_geometry["shank_number"] > 1: + # if probe_description["shank_number"] > 1: if shank_ids is not None: # shank_ids = np.array(contact_info["shank_id"]) - shank_pitch = probe_geometry["shank_pitch_um"] + shank_pitch = probe_description["shank_pitch_um"] contact_ids = [f"s{shank_id}e{elec_id}" for shank_id, elec_id in zip(shank_ids, elec_ids)] x_pos += np.array(shank_ids).astype(int) * shank_pitch else: @@ -273,13 +273,13 @@ def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids positions = np.stack((x_pos, y_pos), axis=1) # construct Probe object - probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") - probe.description = probe_geometry["description"] + probe = Probe(ndim=2, si_units="um", model_name=model_name, manufacturer="imec") + probe.description = probe_description["description"] probe.set_contacts( positions=positions, shapes="square", shank_ids=shank_ids, - shape_params={"width": probe_geometry["electrode_size_horz_direction_um"]}, + shape_params={"width": probe_description["electrode_size_horz_direction_um"]}, ) probe.set_contact_ids(contact_ids) @@ -287,13 +287,13 @@ def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids # Add planar contour polygon = np.array( get_probe_contour_vertices( - probe_geometry["shank_width_um"], probe_geometry["tip_length_um"], get_probe_length(probe_part_number) + probe_description["shank_width_um"], probe_description["tip_length_um"], get_probe_length(model_name) ) ) contour = [] - shank_pitch = probe_geometry["shank_pitch_um"] - for shank_id in range(probe_geometry["num_shanks"]): + shank_pitch = probe_description["shank_pitch_um"] + for shank_id in range(probe_description["num_shanks"]): shank_shift = np.array([shank_pitch * shank_id, 0]) contour += list(polygon + shank_shift) @@ -301,7 +301,7 @@ def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids middle_of_bottommost_electrode_to_top_of_shank_tip = 11 contour_shift = np.array( [ - -probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"], + -probe_description["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"], -middle_of_bottommost_electrode_to_top_of_shank_tip, ] ) @@ -310,25 +310,28 @@ def _make_npx_probe_from_description(probe_geometry, probe_part_number, elec_ids # shank tips : minimum of the polygon shank_tips = [] - for shank_id in range(probe_geometry["num_shanks"]): + for shank_id in range(probe_description["num_shanks"]): shank_shift = np.array([shank_pitch * shank_id, 0]) shank_tip = np.array(polygon[2]) + contour_shift + shank_shift shank_tips.append(shank_tip.tolist()) probe.annotate(shank_tips=shank_tips) + # wire it + probe.set_device_channel_indices(np.arange(positions.shape[0])) + # set other key metadata annotations probe.annotate( - adc_bit_depth=int(probe_geometry["adc_bit_depth"]), - num_readout_channels=int(probe_geometry["num_readout_channels"]), - ap_sample_frequency_hz=float(probe_geometry["ap_sample_frequency_hz"]), - lf_sample_frequency_hz=float(probe_geometry["lf_sample_frequency_hz"]), + adc_bit_depth=int(probe_description["adc_bit_depth"]), + num_readout_channels=int(probe_description["num_readout_channels"]), + ap_sample_frequency_hz=float(probe_description["ap_sample_frequency_hz"]), + lf_sample_frequency_hz=float(probe_description["lf_sample_frequency_hz"]), ) # annotate with MUX table - if mux_table_string is not None: + if mux_info is not None: # annotate each contact with its mux channel - num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) + num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_info) num_contacts = positions.shape[0] # ADC group: which adc is used for each contact adc_groups = np.zeros(num_contacts, dtype="int64") @@ -377,6 +380,7 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: # ===== 1. Load configuration ===== probe_features = _load_np_probe_features() probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] # ===== 2. Calculate electrode IDs and shank IDs ===== num_shanks = int(probe_spec_dict["num_shanks"]) @@ -386,7 +390,7 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: elec_ids = np.arange(contacts_per_shank, dtype=int) shank_ids = None else: - elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for i in range(num_shanks)]) + elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for _ in range(num_shanks)]) shank_ids = np.concatenate([np.zeros(contacts_per_shank, dtype=int) + i for i in range(num_shanks)]) # ===== 3. Calculate contact positions ===== @@ -465,84 +469,65 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: ) # ===== 8. Add MUX table annotations ===== - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] - num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) - num_contacts = positions.shape[0] - adc_groups = np.zeros(num_contacts, dtype="int64") - adc_sample_order = np.zeros(num_contacts, dtype="int64") - for adc_index, adc_groups_per_adc in enumerate(mux_table): - adc_groups_per_adc = adc_groups_per_adc[adc_groups_per_adc < num_contacts] - adc_groups[adc_groups_per_adc] = adc_index - adc_sample_order[adc_groups_per_adc] = np.arange(len(adc_groups_per_adc)) - probe.annotate(num_adcs=num_adcs) - probe.annotate(num_channels_per_adc=num_channels_per_adc) - probe.annotate_contacts(adc_group=adc_groups) - probe.annotate_contacts(adc_sample_order=adc_sample_order) + if mux_table_string is not None: + num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) + num_contacts = positions.shape[0] + adc_groups = np.zeros(num_contacts, dtype="int64") + adc_sample_order = np.zeros(num_contacts, dtype="int64") + for adc_index, adc_groups_per_adc in enumerate(mux_table): + adc_groups_per_adc = adc_groups_per_adc[adc_groups_per_adc < num_contacts] + adc_groups[adc_groups_per_adc] = adc_index + adc_sample_order[adc_groups_per_adc] = np.arange(len(adc_groups_per_adc)) + probe.annotate(num_adcs=num_adcs) + probe.annotate(num_channels_per_adc=num_channels_per_adc) + probe.annotate_contacts(adc_group=adc_groups) + probe.annotate_contacts(adc_sample_order=adc_sample_order) return probe -def _parse_imro_table(imro_str: str, probe_part_number: str) -> tuple[str, np.ndarray, dict]: +def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: """ - Parse the IMRO table to extract recorded electrode IDs and configuration settings. - - This function parses the IMRO (IMEC Readout) table from SpikeGLX meta files to determine - which electrodes were actually recorded and their configuration (gains, filters, references). - It does NOT build a probe object - use build_neuropixels_probe() for that. + Parse the IMRO table when presented as a string and create a Probe object. Parameters ---------- imro_str : str - IMRO table as a string from SpikeGLX meta file. - probe_part_number : str - Probe part number (e.g., "NP1000", "NP2000"). This is the `imDatPrb_pn` field - from the SpikeGLX meta file. + IMRO table as a string. + imDatPrb_pn : str, optional + Probe number, by default None. Returns ------- - probe_type : str - Numeric probe type from IMRO header (e.g., "0" for NP1.0, "21" for NP2.0) - elec_ids : np.ndarray - Array of electrode IDs that were recorded (e.g., 384 values for NP1.0) - contact_annotations : dict - Dictionary of per-contact annotations including: - - channel_ids: recorded channel indices - - banks: bank selection per channel (NP1.0) - - references: reference electrode selection - - ap_gains: AP band gain settings - - lf_gains: LF band gain settings - - ap_hp_filters: AP high-pass filter on/off + Probe + A Probe object built from the parsed IMRO table data. See Also -------- - build_neuropixels_probe : Build complete probe geometry https://billkarsh.github.io/SpikeGLX/help/imroTables/ + """ + probe_type_num_chans, *imro_table_values_list, _ = imro_str.strip().split(")") # probe_type_num_chans looks like f"({probe_type},{num_chans}" probe_type = probe_type_num_chans.split(",")[0][1:] - # Load probe features to get IMRO field names probe_features = _load_np_probe_features() - probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] - imro_field_names = _get_imro_field_names(probe_features, probe_spec_dict) + pt_metadata, fields, mux_info = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) - # Parse IMRO table entries using field names - contact_info = {k: [] for k in imro_field_names} + # fields = probe_description["fields_in_imro_table"] + contact_info = {k: [] for k in fields} for field_values_str in imro_table_values_list: # Imro table values look like '(value, value, value, ... ' # Split them by space to get int('value'), int('value'), int('value'), ...) values = tuple(map(int, field_values_str[1:].split(" "))) - for field, field_value in zip(imro_field_names, values): + for field, field_value in zip(fields, values): contact_info[field].append(field_value) - # Extract electrode IDs from the IMRO table channel_ids = np.array(contact_info["channel"]) if "electrode" in contact_info: - # NP2.0+: electrode IDs are explicit in IMRO table elec_ids = np.array(contact_info["electrode"]) else: - # NP1.0: calculate electrode IDs from bank selection if contact_info.get("bank") is not None: bank_key = "bank" elif contact_info.get("bank_mask") is not None: @@ -550,131 +535,90 @@ def _parse_imro_table(imro_str: str, probe_part_number: str) -> tuple[str, np.nd banks = np.array(contact_info[bank_key]) elec_ids = banks * 384 + channel_ids - # Convert IMRO field names to ProbeInterface naming for backwards compatibility - vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - contact_annotations = {} - for k, v in contact_info.items(): - if (k in vector_properties) and (len(v) > 0): - contact_annotations[imro_field_to_pi_field.get(k)] = v - - # Add probe_type as a scalar annotation - contact_annotations["probe_type"] = probe_type - - return probe_type, elec_ids, contact_annotations - - -def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: - """ - Parse the IMRO table when presented as a string and create a Probe object. - - DEPRECATED: This function is kept for backwards compatibility but will be removed - in a future version. Use build_neuropixels_probe() + _parse_imro_table() instead. - - Parameters - ---------- - imro_str : str - IMRO table as a string. - imDatPrb_pn : str, optional - Probe number, by default None. - - Returns - ------- - Probe - A Probe object built from the parsed IMRO table data. - - See Also - -------- - build_neuropixels_probe : Build complete probe geometry - _parse_imro_table : Parse IMRO table configuration - https://billkarsh.github.io/SpikeGLX/help/imroTables/ - """ - # Parse IMRO table - probe_type, elec_ids, contact_annotations = _parse_imro_table(imro_str, imDatPrb_pn) + if pt_metadata["num_shanks"] > 1: + shank_ids = np.array(contact_info["shank"]) + else: + shank_ids = None - # Build full probe - probe = build_neuropixels_probe(imDatPrb_pn) + probe = _make_npx_probe_from_description(pt_metadata, imDatPrb_pn, elec_ids, shank_ids, mux_info) - # Slice the full probe to only include the recorded electrodes from the IMRO table. - # For example, this reduces 960 total contacts down to the 384 actually recorded. - keep_indices = [] - for elec_id in elec_ids: - matching_indices = np.where(probe.contact_ids == f"e{elec_id}")[0] - if len(matching_indices) > 0: - keep_indices.append(matching_indices[0]) + # scalar annotations + probe.annotate( + probe_type=probe_type, + ) - probe = probe.get_slice(np.array(keep_indices, dtype=int)) + # vector annotations + vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - # Add probe type annotation - probe.annotate(probe_type=contact_annotations.pop("probe_type")) + vector_properties_available = {} + for k, v in contact_info.items(): + if (k in vector_properties) and (len(v) > 0): + # convert to ProbeInterface naming for backwards compatibility + vector_properties_available[imro_field_to_pi_field.get(k)] = v - # Add contact annotations from IMRO table - probe.annotate_contacts(**contact_annotations) + probe.annotate_contacts(**vector_properties_available) return probe -def _extract_probe_geometry(probe_spec_dict: dict) -> dict: +def get_probe_metadata_from_probe_features(probe_features: dict, imDatPrb_pn: str): """ - Extract probe geometry from probe specification, converting string values to appropriate numeric types. + Parses the `probe_features` dict, to cast string to appropriate types + and parses the imro_table_fields string. Returns the metadata needed + to construct a probe with part number `imDatPrb_pn`. Parameters ---------- - probe_spec_dict : dict - Raw probe specification dictionary from probe_features["neuropixels_probes"][probe_name] + probe_features : dict + Dictionary obtained when reading in the `neuropixels_probe_features.json` file. + imDatPrb_pn : str + Probe part number. Returns ------- - dict - A new dictionary with numeric fields converted to int or float + probe_metadata, imro_field, mux_information + Dictionary of probe metadata. + Tuple of fields included in the `imro_table_fields`. + Mux table information, if available, as a string. """ - int_geometry_keys = [ - "num_shanks", "cols_per_shank", "rows_per_shank", "adc_bit_depth", "num_readout_channels" - ] - float_geometry_keys = [ - "electrode_pitch_horz_um", - "electrode_pitch_vert_um", - "electrode_size_horz_direction_um", - "shank_pitch_um", - "shank_width_um", - "tip_length_um", - "even_row_horz_offset_left_edge_to_leftmost_electrode_center_um", - "odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um", - ] - probe_geometry = {} - for key, value in probe_spec_dict.items(): - if key in int_geometry_keys: - probe_geometry[key] = int(value) - elif key in float_geometry_keys: - probe_geometry[key] = float(value) - else: - probe_geometry[key] = value + probe_metadata = probe_features["neuropixels_probes"].get(imDatPrb_pn) + for key in probe_metadata.keys(): + if key in ["num_shanks", "cols_per_shank", "rows_per_shank", "adc_bit_depth", "num_readout_channels"]: + probe_metadata[key] = int(probe_metadata[key]) + elif key in [ + "electrode_pitch_horz_um", + "electrode_pitch_vert_um", + "electrode_size_horz_direction_um", + "shank_pitch_um", + "shank_width_um", + "tip_length_um", + "even_row_horz_offset_left_edge_to_leftmost_electrode_center_um", + "odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um", + ]: + probe_metadata[key] = float(probe_metadata[key]) + + # Read the imro table formats to find out which fields the imro tables contain + imro_table_format_type = probe_metadata["imro_table_format_type"] + imro_table_fields = probe_features["z_imro_formats"][imro_table_format_type + "_elm_flds"] - return probe_geometry + # parse the imro_table_fields, which look like (value value value ...) + list_of_imro_fields = imro_table_fields.replace("(", "").replace(")", "").split(" ") + imro_fields_list = [] + for imro_field in list_of_imro_fields: + imro_fields_list.append(imro_field) -def _get_imro_field_names(probe_features: dict, probe_spec_dict: dict) -> tuple: - """ - Get IMRO table field names for parsing IMRO table entries. + imro_fields = tuple(imro_fields_list) - Parameters - ---------- - probe_features : dict - Full probe features dictionary from neuropixels_probe_features.json - probe_spec_dict : dict - Probe specification dictionary (must contain 'imro_table_format_type') + # Read MUX table information + mux_information = None - Returns - ------- - tuple - Field names for IMRO table parsing, e.g., ('channel', 'bank', 'ref_id', 'ap_gain', ...) - """ - imro_table_format_type = probe_spec_dict["imro_table_format_type"] - imro_table_fields = probe_features["z_imro_formats"][imro_table_format_type + "_elm_flds"] + if "z_mux_tables" in probe_features: + mux_table_format_type = probe_metadata.get("mux_table_format_type", None) + mux_information = probe_features["z_mux_tables"].get(mux_table_format_type, None) - # Parse the field string, which looks like "(channel bank ref_id ...)" - fields_str = imro_table_fields.replace("(", "").replace(")", "") - return tuple(fields_str.split(" ")) + return probe_metadata, imro_fields, mux_information def write_imro(file: str | Path, probe: Probe): @@ -763,43 +707,92 @@ def read_spikeglx(file: str | Path) -> Probe: assert "imroTbl" in meta, "Could not find imroTbl field in meta file!" imro_table = meta["imroTbl"] - # Extract probe part number - probe_part_number = meta.get("imDatPrb_pn", None) - # Only Phase3a probe has "imProbeOpt". Map this to NP1010 + # read serial number + imDatPrb_serial_number = meta.get("imDatPrb_sn", None) + if imDatPrb_serial_number is None: # this is for Phase3A + imDatPrb_serial_number = meta.get("imProbeSN", None) + + # read other metadata + imDatPrb_pn = meta.get("imDatPrb_pn", None) + imDatPrb_port = meta.get("imDatPrb_port", None) + imDatPrb_slot = meta.get("imDatPrb_slot", None) + imDatPrb_part_number = meta.get("imDatPrb_pn", None) + + # Only Phase3a probe has "imProbeOpt". Map this to NP10101. if meta.get("imProbeOpt") is not None: - probe_part_number = "NP1010" + imDatPrb_pn = "NP1010" + + # ===== Step 1: Build full probe with all contacts ===== + full_probe = build_neuropixels_probe(imDatPrb_pn) + + # ===== Step 2: Parse IMRO table to get active electrode IDs and settings ===== + probe_type_num_chans, *imro_table_values_list, _ = imro_table.strip().split(")") - # Build the FULL probe with all possible contacts from the probe part number - probe = build_neuropixels_probe(probe_part_number) + # probe_type_num_chans looks like f"({probe_type},{num_chans}" + probe_type = probe_type_num_chans.split(",")[0][1:] + + probe_features = _load_np_probe_features() + _, fields, _ = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) + + # Parse IMRO table values + contact_info = {k: [] for k in fields} + for field_values_str in imro_table_values_list: # Imro table values look like '(value, value, value, ... ' + # Split them by space to get int('value'), int('value'), int('value'), ...) + values = tuple(map(int, field_values_str[1:].split(" "))) + for field, field_value in zip(fields, values): + contact_info[field].append(field_value) - # Parse IMRO table to get recorded electrode IDs and configuration - probe_type, elec_ids, contact_annotations = _parse_imro_table(imro_table, probe_part_number) + # Extract electrode IDs from IMRO table + channel_ids = np.array(contact_info["channel"]) + if "electrode" in contact_info: + elec_ids = np.array(contact_info["electrode"]) + else: + if contact_info.get("bank") is not None: + bank_key = "bank" + elif contact_info.get("bank_mask") is not None: + bank_key = "bank_mask" + banks = np.array(contact_info[bank_key]) + elec_ids = banks * 384 + channel_ids - # Slice the full probe to only include the recorded electrodes from the IMRO table. - # For example, this reduces 960 total contacts down to the 384 actually recorded. + # ===== Step 3: Slice full probe to active electrodes ===== + # Find indices in full probe that match the active electrode IDs from IMRO keep_indices = [] for elec_id in elec_ids: - matching_indices = np.where(probe.contact_ids == f"e{elec_id}")[0] + # For multi-shank probes, we need to check if shank info is in contact_info + if "shank" in contact_info: + # Get the shank for this electrode from IMRO table + shank_idx = contact_info["shank"][len(keep_indices)] + contact_id = f"s{shank_idx}e{elec_id}" + else: + contact_id = f"e{elec_id}" + + # Search for this contact ID in the full probe + matching_indices = np.where(full_probe.contact_ids == contact_id)[0] if len(matching_indices) > 0: keep_indices.append(matching_indices[0]) - probe = probe.get_slice(keep_indices) - # Add probe type from IMRO header - probe.annotate(probe_type=contact_annotations.pop("probe_type")) + probe = full_probe.get_slice(np.array(keep_indices, dtype=int)) - # Add contact annotations from IMRO table (gains, filters, references, banks) - probe.annotate_contacts(**contact_annotations) + # ===== Step 4: Add IMRO-specific annotations ===== + # scalar annotations + probe.annotate(probe_type=probe_type) - # Add probe-level metadata from meta file - serial_number = meta.get("imDatPrb_sn", None) - if serial_number is None: # Phase3A uses different field name - serial_number = meta.get("imProbeSN", None) - probe.annotate( - serial_number=serial_number, - part_number=probe_part_number, - port=meta.get("imDatPrb_port", None), - slot=meta.get("imDatPrb_slot", None), - ) + # vector annotations + vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + + vector_properties_available = {} + for k, v in contact_info.items(): + if (k in vector_properties) and (len(v) > 0): + # convert to ProbeInterface naming for backwards compatibility + vector_properties_available[imro_field_to_pi_field.get(k)] = v + + probe.annotate_contacts(**vector_properties_available) + + # add serial number and other annotations + probe.annotate(serial_number=imDatPrb_serial_number) + probe.annotate(part_number=imDatPrb_part_number) + probe.annotate(port=imDatPrb_port) + probe.annotate(slot=imDatPrb_slot) # sometimes we need to slice the probe when not all channels are saved saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) @@ -1099,24 +1092,37 @@ def read_openephys( selected_electrodes = np_probe.find("SELECTED_ELECTRODES") channels = np_probe.find("CHANNELS") + pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) + if selected_electrodes is not None: - # Build the FULL probe with all possible contacts from the probe part number - full_probe = build_neuropixels_probe(probe_part_number) + selected_electrodes_values = selected_electrodes.attrib.values() + + num_shank = pt_metadata["num_shanks"] + contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] + + if num_shank == 1: + elec_ids = np.arange(contact_per_shank, dtype=int) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contact_per_shank, dtype=int) for i in range(num_shank)]) + shank_ids = np.concatenate([np.zeros(contact_per_shank, dtype=int) + i for i in range(num_shank)]) + + full_probe = _make_npx_probe_from_description( + pt_metadata, probe_part_number, elec_ids, shank_ids, mux_info=mux_info + ) + + selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] - # Slice the full probe to only include the selected electrodes from OpenEphys settings - selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes.attrib.values()] sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) np_probe_dict = { + "pt_metadata": pt_metadata, "serial_number": probe_serial_number, "part_number": probe_part_number, + "mux_info": mux_info, "probe": sliced_probe, } else: - # For older OpenEphys versions without SELECTED_ELECTRODES, we need probe_geometry for position calculations - probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] - probe_geometry = _extract_probe_geometry(probe_spec_dict) - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] channel_names = np.array(list(channels.attrib.keys())) channel_ids = np.array([int(ch[2:]) for ch in channel_names]) @@ -1145,7 +1151,7 @@ def read_openephys( ypos = np.array([float(electrode_ypos.attrib[ch]) for ch in channel_names]) positions = np.array([xpos, ypos]).T - shank_pitch = probe_geometry["shank_pitch_um"] + shank_pitch = pt_metadata["shank_pitch_um"] if fix_x_position_for_oe_5 and oe_version < parse("0.6.0") and shank_ids is not None: positions[:, 1] = positions[:, 1] - shank_pitch * shank_ids @@ -1158,20 +1164,20 @@ def read_openephys( positions[:, 0] -= offset # - y_pitch = probe_geometry[ + y_pitch = pt_metadata[ "electrode_pitch_vert_um" ] # Vertical spacing between the centers of adjacent contacts - x_pitch = probe_geometry[ + x_pitch = pt_metadata[ "electrode_pitch_horz_um" ] # Horizontal spacing between the centers of contacts within the same row - number_of_columns = probe_geometry["cols_per_shank"] + number_of_columns = pt_metadata["cols_per_shank"] probe_stagger = ( - probe_geometry["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] - - probe_geometry["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + pt_metadata["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] + - pt_metadata["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"] ) - num_shanks = probe_geometry["num_shanks"] + num_shanks = pt_metadata["num_shanks"] - description = probe_geometry.get("description") + description = pt_metadata.get("description") elec_ids = [] for i, pos in enumerate(positions): @@ -1203,13 +1209,13 @@ def read_openephys( np_probe_dict = { "shank_ids": shank_ids, "elec_ids": elec_ids, - "probe_geometry": probe_geometry, + "pt_metadata": pt_metadata, "slot": slot, "port": port, "dock": dock, "serial_number": probe_serial_number, "part_number": probe_part_number, - "mux_table_string": mux_table_string, + "mux_info": mux_info, } # Sequentially assign probe names @@ -1318,11 +1324,11 @@ def read_openephys( # check if subset of channels shank_ids = np_probe_info["shank_ids"] elec_ids = np_probe_info["elec_ids"] - probe_geometry = np_probe_info["probe_geometry"] - mux_table_string = np_probe_info["mux_table_string"] + pt_metadata = np_probe_info["pt_metadata"] + mux_info = np_probe_info["mux_info"] probe = _make_npx_probe_from_description( - probe_geometry, probe_part_number, elec_ids, shank_ids=shank_ids, mux_table_string=mux_table_string + pt_metadata, probe_part_number, elec_ids, shank_ids=shank_ids, mux_info=mux_info ) chans_saved = get_saved_channel_indices_from_openephys_settings(settings_file, stream_name=stream_name) From 420c616a0fd39b2a86864017ecb48d2f79af2689 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 20:46:00 -0600 Subject: [PATCH 05/15] comparison --- src/probeinterface/neuropixels_tools.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index c4dea25..e4f1962 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -1095,24 +1095,15 @@ def read_openephys( pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) if selected_electrodes is not None: - selected_electrodes_values = selected_electrodes.attrib.values() + # ===== Path 1: Build full probe from probe_part_number and slice with SELECTED_ELECTRODES ===== - num_shank = pt_metadata["num_shanks"] - contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] + # Step 1: Build full probe with all contacts + full_probe = build_neuropixels_probe(probe_part_number) - if num_shank == 1: - elec_ids = np.arange(contact_per_shank, dtype=int) - shank_ids = None - else: - elec_ids = np.concatenate([np.arange(contact_per_shank, dtype=int) for i in range(num_shank)]) - shank_ids = np.concatenate([np.zeros(contact_per_shank, dtype=int) + i for i in range(num_shank)]) - - full_probe = _make_npx_probe_from_description( - pt_metadata, probe_part_number, elec_ids, shank_ids, mux_info=mux_info - ) - - selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] + # Step 2: Get selected electrode indices from XML + selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes.attrib.values()] + # Step 3: Slice probe to selected electrodes sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) np_probe_dict = { @@ -1123,6 +1114,8 @@ def read_openephys( "probe": sliced_probe, } else: + # ===== Path 2 (Legacy): Build probe from ELECTRODE_XPOS/YPOS fields ===== + # This path is kept unchanged for backward compatibility with older OpenEphys versions channel_names = np.array(list(channels.attrib.keys())) channel_ids = np.array([int(ch[2:]) for ch in channel_names]) From 8c0f1876e5acecd14beb8542ca5c2ffe63c5182f Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 22:25:12 -0600 Subject: [PATCH 06/15] doc improvements --- src/probeinterface/neuropixels_tools.py | 104 +++++++++++++----------- 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index e4f1962..25b96c0 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -353,10 +353,11 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: """ Build a Neuropixels probe with all possible contacts from the probe part number. - This function constructs a complete probe geometry including all physical contacts, - MUX table information, and probe-level metadata. The resulting probe contains ALL - electrodes (e.g., 960 for NP1.0, 1280 for NP2.0), not just the subset that might - be recorded in an actual experiment. + This function constructs a complete probe geometry based on IMEC manufacturer specifications + sourced from Bill Karsh's ProbeTable repository (https://github.com/billkarsh/ProbeTable). + The specifications include contact positions, electrode dimensions, shank geometry, MUX routing + tables, and ADC configurations. The resulting probe contains ALL electrodes (e.g., 960 for + NP1.0, 1280 for NP2.0), not just the subset that might be recorded in an actual experiment. Parameters ---------- @@ -681,21 +682,29 @@ def write_imro(file: str | Path, probe: Probe): def read_spikeglx(file: str | Path) -> Probe: """ - Read probe position for the meta file generated by SpikeGLX + Read probe geometry and configuration from a SpikeGLX metadata file. - See http://billkarsh.github.io/SpikeGLX/#metadata-guides for implementation. - The x_pitch/y_pitch/width are set automatically depending on the NP version. - - The shape is auto generated as a shank. + This function reconstructs the probe used in a recording by: + 1. Reading the probe part number from metadata + 2. Building the full probe geometry from manufacturer specifications + 3. Slicing to the electrodes selected in the IMRO table + 4. Further slicing to channels actually saved to disk (if subset was saved) + 5. Adding recording-specific annotations + 6. Add wiring (device channel indices) Parameters ---------- file : Path or str - The .meta file path + Path to the SpikeGLX .meta file Returns ------- - probe : Probe object + probe : Probe + Probe object with geometry, contact annotations, and device channel mapping + + See Also + -------- + http://billkarsh.github.io/SpikeGLX/#metadata-guides """ @@ -703,43 +712,37 @@ def read_spikeglx(file: str | Path) -> Probe: assert meta_file.suffix == ".meta", "'meta_file' should point to the .meta SpikeGLX file" meta = parse_spikeglx_meta(meta_file) - assert "imroTbl" in meta, "Could not find imroTbl field in meta file!" - imro_table = meta["imroTbl"] - # read serial number - imDatPrb_serial_number = meta.get("imDatPrb_sn", None) - if imDatPrb_serial_number is None: # this is for Phase3A - imDatPrb_serial_number = meta.get("imProbeSN", None) - - # read other metadata + # ===== 1. Extract probe part number from metadata ===== imDatPrb_pn = meta.get("imDatPrb_pn", None) - imDatPrb_port = meta.get("imDatPrb_port", None) - imDatPrb_slot = meta.get("imDatPrb_slot", None) - imDatPrb_part_number = meta.get("imDatPrb_pn", None) - - # Only Phase3a probe has "imProbeOpt". Map this to NP10101. + # Only Phase3a probe has "imProbeOpt". Map this to NP1010. if meta.get("imProbeOpt") is not None: imDatPrb_pn = "NP1010" - # ===== Step 1: Build full probe with all contacts ===== - full_probe = build_neuropixels_probe(imDatPrb_pn) + # ===== 2. Build full probe with all possible contacts ===== + # This creates the complete probe geometry (e.g., 960 contacts for NP1.0) + # based on manufacturer specifications + full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn) - # ===== Step 2: Parse IMRO table to get active electrode IDs and settings ===== + # ===== 3. Parse IMRO table to extract recorded electrodes and acquisition settings ===== + # The IMRO table specifies which electrodes were selected for recording (e.g., 384 of 960), + # plus their acquisition settings (gains, references, filters) + imro_table = meta["imroTbl"] probe_type_num_chans, *imro_table_values_list, _ = imro_table.strip().split(")") # probe_type_num_chans looks like f"({probe_type},{num_chans}" probe_type = probe_type_num_chans.split(",")[0][1:] probe_features = _load_np_probe_features() - _, fields, _ = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) + _, imro_fields, _ = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) # Parse IMRO table values - contact_info = {k: [] for k in fields} + contact_info = {k: [] for k in imro_fields} for field_values_str in imro_table_values_list: # Imro table values look like '(value, value, value, ... ' # Split them by space to get int('value'), int('value'), int('value'), ...) values = tuple(map(int, field_values_str[1:].split(" "))) - for field, field_value in zip(fields, values): + for field, field_value in zip(imro_fields, values): contact_info[field].append(field_value) # Extract electrode IDs from IMRO table @@ -754,8 +757,8 @@ def read_spikeglx(file: str | Path) -> Probe: banks = np.array(contact_info[bank_key]) elec_ids = banks * 384 + channel_ids - # ===== Step 3: Slice full probe to active electrodes ===== - # Find indices in full probe that match the active electrode IDs from IMRO + # ===== 4. Slice full probe to IMRO-selected electrodes ===== + # Match the electrode IDs from IMRO table to contacts in the full probe keep_indices = [] for elec_id in elec_ids: # For multi-shank probes, we need to check if shank info is in contact_info @@ -773,35 +776,38 @@ def read_spikeglx(file: str | Path) -> Probe: probe = full_probe.get_slice(np.array(keep_indices, dtype=int)) - # ===== Step 4: Add IMRO-specific annotations ===== - # scalar annotations + # Add IMRO-specific contact annotations (acquisition settings) probe.annotate(probe_type=probe_type) - - # vector annotations vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - vector_properties_available = {} for k, v in contact_info.items(): if (k in vector_properties) and (len(v) > 0): - # convert to ProbeInterface naming for backwards compatibility vector_properties_available[imro_field_to_pi_field.get(k)] = v - probe.annotate_contacts(**vector_properties_available) - # add serial number and other annotations + # ===== 5. Slice to saved channels (if subset was saved) ===== + # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, + # but SpikeGLX can optionally save only a subset of acquired channels to reduce file size. + # For example: IMRO selects 384 electrodes, but only 300 are saved to disk. + saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) + saved_chans = saved_chans[saved_chans < probe.get_contact_count()] # Remove SYS channels + if saved_chans.size != probe.get_contact_count(): + probe = probe.get_slice(saved_chans) + + # ===== 6. Add recording-specific annotations ===== + # These annotations identify the physical probe instance and recording setup + imDatPrb_serial_number = meta.get("imDatPrb_sn") or meta.get("imProbeSN") # Phase3A uses imProbeSN + imDatPrb_port = meta.get("imDatPrb_port", None) + imDatPrb_slot = meta.get("imDatPrb_slot", None) probe.annotate(serial_number=imDatPrb_serial_number) - probe.annotate(part_number=imDatPrb_part_number) + probe.annotate(part_number=imDatPrb_pn) probe.annotate(port=imDatPrb_port) probe.annotate(slot=imDatPrb_slot) - # sometimes we need to slice the probe when not all channels are saved - saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) - # remove the SYS chans - saved_chans = saved_chans[saved_chans < probe.get_contact_count()] - if saved_chans.size != probe.get_contact_count(): - # slice if needed - probe = probe.get_slice(saved_chans) - # wire it + # ===== 7. Set device channel indices (wiring) ===== + # Device channel indices map probe contacts to data file channels. + # After all slicing, channels are numbered 0, 1, 2, ... N-1 in the order they appear + # in the binary data file, so we wire them sequentially. probe.set_device_channel_indices(np.arange(probe.get_contact_count())) return probe From ae14e2e1bdddd66421c2826869d6435f3dc4b0eb Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 22:36:00 -0600 Subject: [PATCH 07/15] docs --- src/probeinterface/neuropixels_tools.py | 55 ++++++++++++++----------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 25b96c0..c446523 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -745,36 +745,43 @@ def read_spikeglx(file: str | Path) -> Probe: for field, field_value in zip(imro_fields, values): contact_info[field].append(field_value) - # Extract electrode IDs from IMRO table - channel_ids = np.array(contact_info["channel"]) + # Convert IMRO channel numbers to physical electrode IDs + # The IMRO table specifies which electrodes were recorded, but different probe types + # encode this information differently: + readout_channel_ids = np.array(contact_info["channel"]) # 0-383 for NP1.0 + if "electrode" in contact_info: - elec_ids = np.array(contact_info["electrode"]) + # NP2.0 and some probes directly specify the physical electrode ID + physical_electrode_ids = np.array(contact_info["electrode"]) else: - if contact_info.get("bank") is not None: - bank_key = "bank" - elif contact_info.get("bank_mask") is not None: - bank_key = "bank_mask" - banks = np.array(contact_info[bank_key]) - elec_ids = banks * 384 + channel_ids + # NP1.0 uses banks to encode electrode position + # Physical electrode ID = bank * 384 + channel + # (e.g., bank 0, channel 0 → electrode 0; bank 1, channel 0 → electrode 384) + bank_key = "bank" if "bank" in contact_info else "bank_mask" + bank_indices = np.array(contact_info[bank_key]) + physical_electrode_ids = bank_indices * 384 + readout_channel_ids # ===== 4. Slice full probe to IMRO-selected electrodes ===== - # Match the electrode IDs from IMRO table to contacts in the full probe - keep_indices = [] - for elec_id in elec_ids: - # For multi-shank probes, we need to check if shank info is in contact_info + # The full probe has all electrodes (e.g., 960 for NP1.0). We need to find which + # indices in the full probe array correspond to the electrodes selected in the IMRO table. + selected_contact_indices = [] + + for idx, electrode_id in enumerate(physical_electrode_ids): + # Build the contact ID string that matches the full probe's contact_ids array if "shank" in contact_info: - # Get the shank for this electrode from IMRO table - shank_idx = contact_info["shank"][len(keep_indices)] - contact_id = f"s{shank_idx}e{elec_id}" + # Multi-shank probes: contact ID = "s{shank}e{electrode}" + shank_id = contact_info["shank"][idx] + contact_id_str = f"s{shank_id}e{electrode_id}" else: - contact_id = f"e{elec_id}" + # Single-shank probes: contact ID = "e{electrode}" + contact_id_str = f"e{electrode_id}" - # Search for this contact ID in the full probe - matching_indices = np.where(full_probe.contact_ids == contact_id)[0] - if len(matching_indices) > 0: - keep_indices.append(matching_indices[0]) + # Find where this contact appears in the full probe + full_probe_index = np.where(full_probe.contact_ids == contact_id_str)[0] + if len(full_probe_index) > 0: + selected_contact_indices.append(full_probe_index[0]) - probe = full_probe.get_slice(np.array(keep_indices, dtype=int)) + probe = full_probe.get_slice(np.array(selected_contact_indices, dtype=int)) # Add IMRO-specific contact annotations (acquisition settings) probe.annotate(probe_type=probe_type) @@ -805,9 +812,7 @@ def read_spikeglx(file: str | Path) -> Probe: probe.annotate(slot=imDatPrb_slot) # ===== 7. Set device channel indices (wiring) ===== - # Device channel indices map probe contacts to data file channels. - # After all slicing, channels are numbered 0, 1, 2, ... N-1 in the order they appear - # in the binary data file, so we wire them sequentially. + # I am unsure why are we are doing this. If someone knows please document it here. probe.set_device_channel_indices(np.arange(probe.get_contact_count())) return probe From 11846dadb31aed429c0e5a1badfde3a6850def9b Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 22:46:27 -0600 Subject: [PATCH 08/15] restricto to spikeglx only for simpliity --- src/probeinterface/neuropixels_tools.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index c446523..a89da3e 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -1106,15 +1106,24 @@ def read_openephys( pt_metadata, _, mux_info = get_probe_metadata_from_probe_features(probe_features, probe_part_number) if selected_electrodes is not None: - # ===== Path 1: Build full probe from probe_part_number and slice with SELECTED_ELECTRODES ===== + selected_electrodes_values = selected_electrodes.attrib.values() - # Step 1: Build full probe with all contacts - full_probe = build_neuropixels_probe(probe_part_number) + num_shank = pt_metadata["num_shanks"] + contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] - # Step 2: Get selected electrode indices from XML - selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes.attrib.values()] + if num_shank == 1: + elec_ids = np.arange(contact_per_shank, dtype=int) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contact_per_shank, dtype=int) for i in range(num_shank)]) + shank_ids = np.concatenate([np.zeros(contact_per_shank, dtype=int) + i for i in range(num_shank)]) + + full_probe = _make_npx_probe_from_description( + pt_metadata, probe_part_number, elec_ids, shank_ids, mux_info=mux_info + ) + + selected_electrode_indices = [int(electrode_index) for electrode_index in selected_electrodes_values] - # Step 3: Slice probe to selected electrodes sliced_probe = full_probe.get_slice(selection=selected_electrode_indices) np_probe_dict = { @@ -1125,8 +1134,6 @@ def read_openephys( "probe": sliced_probe, } else: - # ===== Path 2 (Legacy): Build probe from ELECTRODE_XPOS/YPOS fields ===== - # This path is kept unchanged for backward compatibility with older OpenEphys versions channel_names = np.array(list(channels.attrib.keys())) channel_ids = np.array([int(ch[2:]) for ch in channel_names]) From 36be31795aaa7e76d919a7a865146cffcfd1278a Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Wed, 19 Nov 2025 22:51:18 -0600 Subject: [PATCH 09/15] final --- src/probeinterface/neuropixels_tools.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index a89da3e..e44f6ca 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -381,7 +381,6 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: # ===== 1. Load configuration ===== probe_features = _load_np_probe_features() probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] - mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] # ===== 2. Calculate electrode IDs and shank IDs ===== num_shanks = int(probe_spec_dict["num_shanks"]) @@ -447,6 +446,9 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: contour += list(polygon + shank_shift) # Apply contour shift to align with contact positions + # This constant (11 μm) represents the vertical distance from the center of the bottommost + # electrode to the top of the shank tip. This is a geometric constant for Neuropixels probes + # that is not currently available in the ProbeTable specifications. middle_of_bottommost_electrode_to_top_of_shank_tip = 11 contour_shift = np.array([-odd_offset, -middle_of_bottommost_electrode_to_top_of_shank_tip]) contour = np.array(contour) + contour_shift @@ -470,8 +472,18 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: ) # ===== 8. Add MUX table annotations ===== + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] if mux_table_string is not None: - num_adcs, num_channels_per_adc, mux_table = make_mux_table_array(mux_table_string) + # Parse MUX table string: (num_adcs,num_channels_per_adc)(int int ...)(int int ...)... + adc_info = mux_table_string.split(")(")[0] + split_mux = mux_table_string.split(")(")[1:] + num_adcs, num_channels_per_adc = map(int, adc_info[1:].split(",")) + adc_groups_list = [ + np.array(each_mux.replace("(", "").replace(")", "").split(" ")).astype("int") for each_mux in split_mux + ] + mux_table = np.transpose(np.array(adc_groups_list)) + + # Map contacts to ADC groups and sample order num_contacts = positions.shape[0] adc_groups = np.zeros(num_contacts, dtype="int64") adc_sample_order = np.zeros(num_contacts, dtype="int64") @@ -479,6 +491,7 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: adc_groups_per_adc = adc_groups_per_adc[adc_groups_per_adc < num_contacts] adc_groups[adc_groups_per_adc] = adc_index adc_sample_order[adc_groups_per_adc] = np.arange(len(adc_groups_per_adc)) + probe.annotate(num_adcs=num_adcs) probe.annotate(num_channels_per_adc=num_channels_per_adc) probe.annotate_contacts(adc_group=adc_groups) From f200342f57d12341f59d222d25f4fd4611b38481 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Thu, 20 Nov 2025 00:05:06 -0600 Subject: [PATCH 10/15] imro parsing --- src/probeinterface/neuropixels_tools.py | 113 ++++++++++++++---------- 1 file changed, 67 insertions(+), 46 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index e44f6ca..976fad3 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -693,6 +693,51 @@ def write_imro(file: str | Path, probe: Probe): ## +def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: + """ + Parse IMRO (Imec ReadOut) table string into structured per-channel data. + + IMRO format: "(probe_type,num_chans)(ch0 bank0 ref0 ...)(ch1 bank1 ref1 ...)..." + Example: "(0,384)(0 1 0 500 250 1)(1 0 0 500 250 1)..." + + Note: The IMRO header contains a probe_type field (e.g., "0", "21", "24"), which is + a numeric format version identifier that specifies which IMRO table structure was used. + Different probe generations use different IMRO formats. This is a file format detail, + not a physical probe property. + + Parameters + ---------- + imro_table_string : str + IMRO table string from SpikeGLX metadata file + probe_part_number : str + Probe part number (e.g., "NP1000", "NP2000") + + Returns + ------- + imro_per_channel : dict + Dictionary where each key maps to a list of values (one per channel). + Keys are IMRO fields like "channel", "bank", "electrode", "ap_gain", etc. + Example: {"channel": [0,1,2,...], "bank": [1,0,0,...], "ap_gain": [500,500,...]} + """ + # Get IMRO field format from catalogue + probe_features = _load_np_probe_features() + probe_spec = probe_features["neuropixels_probes"][probe_part_number] + imro_format = probe_spec["imro_table_format_type"] + imro_fields_string = probe_features["z_imro_formats"][imro_format + "_elm_flds"] + imro_fields = tuple(imro_fields_string.replace("(", "").replace(")", "").split(" ")) + + # Parse IMRO table values into per-channel data + # Skip the header "(probe_type,num_chans)" and trailing empty string + _, *imro_table_values_list, _ = imro_table_string.strip().split(")") + imro_per_channel = {k: [] for k in imro_fields} + for field_values_str in imro_table_values_list: + values = tuple(map(int, field_values_str[1:].split(" "))) + for field, field_value in zip(imro_fields, values): + imro_per_channel[field].append(field_value) + + return imro_per_channel + + def read_spikeglx(file: str | Path) -> Probe: """ Read probe geometry and configuration from a SpikeGLX metadata file. @@ -739,73 +784,49 @@ def read_spikeglx(file: str | Path) -> Probe: full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn) # ===== 3. Parse IMRO table to extract recorded electrodes and acquisition settings ===== - # The IMRO table specifies which electrodes were selected for recording (e.g., 384 of 960), - # plus their acquisition settings (gains, references, filters) - imro_table = meta["imroTbl"] - probe_type_num_chans, *imro_table_values_list, _ = imro_table.strip().split(")") - - # probe_type_num_chans looks like f"({probe_type},{num_chans}" - probe_type = probe_type_num_chans.split(",")[0][1:] - - probe_features = _load_np_probe_features() - _, imro_fields, _ = get_probe_metadata_from_probe_features(probe_features, imDatPrb_pn) - - # Parse IMRO table values - contact_info = {k: [] for k in imro_fields} - for field_values_str in imro_table_values_list: # Imro table values look like '(value, value, value, ... ' - # Split them by space to get int('value'), int('value'), int('value'), ...) - values = tuple(map(int, field_values_str[1:].split(" "))) - for field, field_value in zip(imro_fields, values): - contact_info[field].append(field_value) - - # Convert IMRO channel numbers to physical electrode IDs - # The IMRO table specifies which electrodes were recorded, but different probe types - # encode this information differently: - readout_channel_ids = np.array(contact_info["channel"]) # 0-383 for NP1.0 - - if "electrode" in contact_info: - # NP2.0 and some probes directly specify the physical electrode ID - physical_electrode_ids = np.array(contact_info["electrode"]) + # IMRO = Imec ReadOut (the configuration table format from IMEC manufacturer) + # Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their + # acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/ + imro_table_string = meta["imroTbl"] + imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn) + + # ===== 4. Get active electrodes from IMRO data ===== + # Convert IMRO channel numbers to physical electrode IDs. Different probe types encode + # electrode selection differently: NP1.0 uses banks, NP2.0+ uses direct electrode IDs. + if "electrode" in imro_per_channel: + # NP2.0+: Direct electrode addressing + physical_electrode_ids = np.array(imro_per_channel["electrode"]) else: - # NP1.0 uses banks to encode electrode position - # Physical electrode ID = bank * 384 + channel - # (e.g., bank 0, channel 0 → electrode 0; bank 1, channel 0 → electrode 384) - bank_key = "bank" if "bank" in contact_info else "bank_mask" - bank_indices = np.array(contact_info[bank_key]) + # NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel) + readout_channel_ids = np.array(imro_per_channel["channel"]) + bank_key = "bank" if "bank" in imro_per_channel else "bank_mask" + bank_indices = np.array(imro_per_channel[bank_key]) physical_electrode_ids = bank_indices * 384 + readout_channel_ids - # ===== 4. Slice full probe to IMRO-selected electrodes ===== - # The full probe has all electrodes (e.g., 960 for NP1.0). We need to find which - # indices in the full probe array correspond to the electrodes selected in the IMRO table. + # ===== 5. Slice full probe to active electrodes ===== selected_contact_indices = [] - for idx, electrode_id in enumerate(physical_electrode_ids): - # Build the contact ID string that matches the full probe's contact_ids array - if "shank" in contact_info: - # Multi-shank probes: contact ID = "s{shank}e{electrode}" - shank_id = contact_info["shank"][idx] + if "shank" in imro_per_channel: + shank_id = imro_per_channel["shank"][idx] contact_id_str = f"s{shank_id}e{electrode_id}" else: - # Single-shank probes: contact ID = "e{electrode}" contact_id_str = f"e{electrode_id}" - # Find where this contact appears in the full probe full_probe_index = np.where(full_probe.contact_ids == contact_id_str)[0] if len(full_probe_index) > 0: selected_contact_indices.append(full_probe_index[0]) probe = full_probe.get_slice(np.array(selected_contact_indices, dtype=int)) - # Add IMRO-specific contact annotations (acquisition settings) - probe.annotate(probe_type=probe_type) + # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") vector_properties_available = {} - for k, v in contact_info.items(): + for k, v in imro_per_channel.items(): if (k in vector_properties) and (len(v) > 0): vector_properties_available[imro_field_to_pi_field.get(k)] = v probe.annotate_contacts(**vector_properties_available) - # ===== 5. Slice to saved channels (if subset was saved) ===== + # ===== 7. Slice to saved channels (if subset was saved) ===== # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, # but SpikeGLX can optionally save only a subset of acquired channels to reduce file size. # For example: IMRO selects 384 electrodes, but only 300 are saved to disk. From d527a81442d24af7a71d50a2d076eae37ceaa76d Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Thu, 20 Nov 2025 00:53:03 -0600 Subject: [PATCH 11/15] improvements to imro reading and channel selection --- src/probeinterface/neuropixels_tools.py | 102 ++++++++++++++++-------- 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 976fad3..3c37990 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -416,10 +416,8 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: positions = np.stack((x_pos, y_pos), axis=1) # ===== 4. Calculate contact IDs ===== - if shank_ids is not None: - contact_ids = [f"s{shank_id}e{elec_id}" for shank_id, elec_id in zip(shank_ids, elec_ids)] - else: - contact_ids = [f"e{elec_id}" for elec_id in elec_ids] + shank_ids_iter = shank_ids if shank_ids is not None else [None] * len(elec_ids) + contact_ids = [_build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids_iter, elec_ids)] # ===== 5. Create Probe object and set contacts ===== probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") @@ -693,6 +691,39 @@ def write_imro(file: str | Path, probe: Probe): ## +def _build_canonical_contact_id(electrode_id: int, shank_id: int | None = None) -> str: + """ + Build the canonical contact ID string for a Neuropixels electrode. + + This establishes the standard naming convention used throughout probeinterface + for Neuropixels contact identification. + + Parameters + ---------- + electrode_id : int + Physical electrode ID on the probe (e.g., 0-959 for NP1.0) + shank_id : int or None, default: None + Shank ID for multi-shank probes. If None, assumes single-shank probe. + + Returns + ------- + contact_id : str + Canonical contact ID string, either "e{electrode_id}" for single-shank + or "s{shank_id}e{electrode_id}" for multi-shank probes. + + Examples + -------- + >>> _build_canonical_contact_id(123) + 'e123' + >>> _build_canonical_contact_id(123, shank_id=0) + 's0e123' + """ + if shank_id is not None: + return f"s{shank_id}e{electrode_id}" + else: + return f"e{electrode_id}" + + def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: """ Parse IMRO (Imec ReadOut) table string into structured per-channel data. @@ -717,7 +748,10 @@ def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: imro_per_channel : dict Dictionary where each key maps to a list of values (one per channel). Keys are IMRO fields like "channel", "bank", "electrode", "ap_gain", etc. - Example: {"channel": [0,1,2,...], "bank": [1,0,0,...], "ap_gain": [500,500,...]} + The "electrode" key always contains physical electrode IDs (0-959 for NP1.0, etc.). + For NP2.0+: electrode IDs come directly from IMRO data. + For NP1.0: electrode IDs are computed as bank * 384 + channel. + Example: {"channel": [0,1,2,...], "bank": [1,0,0,...], "electrode": [384,1,2,...], "ap_gain": [500,500,...]} """ # Get IMRO field format from catalogue probe_features = _load_np_probe_features() @@ -735,6 +769,15 @@ def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: for field, field_value in zip(imro_fields, values): imro_per_channel[field].append(field_value) + # Ensure "electrode" key always exists with physical electrode IDs + # Different probe types encode electrode selection differently + if "electrode" not in imro_per_channel: + # NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel) + readout_channel_ids = np.array(imro_per_channel["channel"]) + bank_key = "bank" if "bank" in imro_per_channel else "bank_mask" + bank_indices = np.array(imro_per_channel[bank_key]) + imro_per_channel["electrode"] = (bank_indices * 384 + readout_channel_ids).tolist() + return imro_per_channel @@ -790,41 +833,32 @@ def read_spikeglx(file: str | Path) -> Probe: imro_table_string = meta["imroTbl"] imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn) - # ===== 4. Get active electrodes from IMRO data ===== - # Convert IMRO channel numbers to physical electrode IDs. Different probe types encode - # electrode selection differently: NP1.0 uses banks, NP2.0+ uses direct electrode IDs. - if "electrode" in imro_per_channel: - # NP2.0+: Direct electrode addressing - physical_electrode_ids = np.array(imro_per_channel["electrode"]) - else: - # NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel) - readout_channel_ids = np.array(imro_per_channel["channel"]) - bank_key = "bank" if "bank" in imro_per_channel else "bank_mask" - bank_indices = np.array(imro_per_channel[bank_key]) - physical_electrode_ids = bank_indices * 384 + readout_channel_ids + # ===== 4. Build contact IDs for active electrodes ===== + # Convert physical electrode IDs to probeinterface canonical contact ID strings + imro_electrode = imro_per_channel["electrode"] + imro_shank = imro_per_channel.get("shank", [None] * len(imro_electrode)) + active_contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) + for shank_id, elec_id in zip(imro_shank, imro_electrode) + ] # ===== 5. Slice full probe to active electrodes ===== - selected_contact_indices = [] - for idx, electrode_id in enumerate(physical_electrode_ids): - if "shank" in imro_per_channel: - shank_id = imro_per_channel["shank"][idx] - contact_id_str = f"s{shank_id}e{electrode_id}" - else: - contact_id_str = f"e{electrode_id}" - - full_probe_index = np.where(full_probe.contact_ids == contact_id_str)[0] - if len(full_probe_index) > 0: - selected_contact_indices.append(full_probe_index[0]) + # Find indices where full probe contact IDs match the active contact IDs + full_probe_contact_ids = np.array(full_probe.contact_ids) + active_contact_ids_array = np.array(active_contact_ids) + mask = np.isin(full_probe_contact_ids, active_contact_ids_array) + selected_contact_indices = np.where(mask)[0] probe = full_probe.get_slice(np.array(selected_contact_indices, dtype=int)) # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== - vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - vector_properties_available = {} - for k, v in imro_per_channel.items(): - if (k in vector_properties) and (len(v) > 0): - vector_properties_available[imro_field_to_pi_field.get(k)] = v - probe.annotate_contacts(**vector_properties_available) + # Map IMRO field names to probeinterface field names and add as contact annotations + imro_properties_to_add = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + probe.annotate_contacts(**{ + imro_field_to_pi_field.get(k): v + for k, v in imro_per_channel.items() + if k in imro_properties_to_add and len(v) > 0 + }) # ===== 7. Slice to saved channels (if subset was saved) ===== # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, From 8a0fff6de8c3760f9190dc87643efd257cef47eb Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Thu, 20 Nov 2025 01:02:17 -0600 Subject: [PATCH 12/15] fix test --- src/probeinterface/neuropixels_tools.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 3c37990..eccf346 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -843,11 +843,9 @@ def read_spikeglx(file: str | Path) -> Probe: ] # ===== 5. Slice full probe to active electrodes ===== - # Find indices where full probe contact IDs match the active contact IDs - full_probe_contact_ids = np.array(full_probe.contact_ids) - active_contact_ids_array = np.array(active_contact_ids) - mask = np.isin(full_probe_contact_ids, active_contact_ids_array) - selected_contact_indices = np.where(mask)[0] + # Find indices of active contacts in the full probe, preserving IMRO order + contact_id_to_index = {contact_id: idx for idx, contact_id in enumerate(full_probe.contact_ids)} + selected_contact_indices = np.array([contact_id_to_index[contact_id] for contact_id in active_contact_ids]) probe = full_probe.get_slice(np.array(selected_contact_indices, dtype=int)) From aa3e2aa9fd97a7250e6ffd0e2c56ce6cadb2c05f Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Thu, 20 Nov 2025 01:05:58 -0600 Subject: [PATCH 13/15] improve --- src/probeinterface/neuropixels_tools.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index eccf346..bd63fdc 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -847,16 +847,17 @@ def read_spikeglx(file: str | Path) -> Probe: contact_id_to_index = {contact_id: idx for idx, contact_id in enumerate(full_probe.contact_ids)} selected_contact_indices = np.array([contact_id_to_index[contact_id] for contact_id in active_contact_ids]) - probe = full_probe.get_slice(np.array(selected_contact_indices, dtype=int)) + probe = full_probe.get_slice(selected_contact_indices) # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== # Map IMRO field names to probeinterface field names and add as contact annotations imro_properties_to_add = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") - probe.annotate_contacts(**{ - imro_field_to_pi_field.get(k): v - for k, v in imro_per_channel.items() - if k in imro_properties_to_add and len(v) > 0 - }) + annotations = {} + for imro_field, values in imro_per_channel.items(): + if imro_field in imro_properties_to_add and len(values) > 0: + pi_field = imro_field_to_pi_field.get(imro_field) + annotations[pi_field] = values + probe.annotate_contacts(**annotations) # ===== 7. Slice to saved channels (if subset was saved) ===== # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, From 2c77e9b7da7097fc2aaa4eddc69736701a308bbd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 20 Nov 2025 07:08:21 +0000 Subject: [PATCH 14/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/probeinterface/neuropixels_tools.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index bd63fdc..2bdceae 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -417,7 +417,9 @@ def build_neuropixels_probe(probe_part_number: str) -> Probe: # ===== 4. Calculate contact IDs ===== shank_ids_iter = shank_ids if shank_ids is not None else [None] * len(elec_ids) - contact_ids = [_build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids_iter, elec_ids)] + contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids_iter, elec_ids) + ] # ===== 5. Create Probe object and set contacts ===== probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") @@ -838,8 +840,7 @@ def read_spikeglx(file: str | Path) -> Probe: imro_electrode = imro_per_channel["electrode"] imro_shank = imro_per_channel.get("shank", [None] * len(imro_electrode)) active_contact_ids = [ - _build_canonical_contact_id(elec_id, shank_id) - for shank_id, elec_id in zip(imro_shank, imro_electrode) + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(imro_shank, imro_electrode) ] # ===== 5. Slice full probe to active electrodes ===== From 67ce7ca2cc37a59d40c2d9ddf6ac3f6f679efdc8 Mon Sep 17 00:00:00 2001 From: Heberto Mayorquin Date: Thu, 20 Nov 2025 01:18:36 -0600 Subject: [PATCH 15/15] simplify more --- src/probeinterface/neuropixels_tools.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 2bdceae..3e7cd65 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -851,13 +851,14 @@ def read_spikeglx(file: str | Path) -> Probe: probe = full_probe.get_slice(selected_contact_indices) # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== - # Map IMRO field names to probeinterface field names and add as contact annotations + # Filter IMRO data to only the properties we want to add as annotations imro_properties_to_add = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + imro_filtered = {k: v for k, v in imro_per_channel.items() if k in imro_properties_to_add and len(v) > 0} + # Map IMRO field names to probeinterface field names and add as contact annotations annotations = {} - for imro_field, values in imro_per_channel.items(): - if imro_field in imro_properties_to_add and len(values) > 0: - pi_field = imro_field_to_pi_field.get(imro_field) - annotations[pi_field] = values + for imro_field, values in imro_filtered.items(): + pi_field = imro_field_to_pi_field.get(imro_field) + annotations[pi_field] = values probe.annotate_contacts(**annotations) # ===== 7. Slice to saved channels (if subset was saved) =====