diff --git a/resources/generate_neuropixels_library.py b/resources/generate_neuropixels_library.py index b4055db..5357965 100644 --- a/resources/generate_neuropixels_library.py +++ b/resources/generate_neuropixels_library.py @@ -6,7 +6,7 @@ import numpy as np import matplotlib.pyplot as plt -from probeinterface.neuropixels_tools import _make_npx_probe_from_description, get_probe_metadata_from_probe_features +from probeinterface.neuropixels_tools import build_neuropixels_probe from probeinterface.plotting import plot_probe from probeinterface import write_probeinterface @@ -28,6 +28,8 @@ def generate_all_npx(base_folder=None): probe_part_numbers = probe_features['neuropixels_probes'].keys() + # Note: model_name here is the probe_part_number (specific SKU like "NP1000"), + # NOT the model/platform (like "Neuropixels 1.0", "Neuropixels 2.0", "Ultra", or "NHP") for model_name in probe_part_numbers: print(model_name) @@ -41,18 +43,8 @@ def generate_all_npx(base_folder=None): probe_folder = base_folder / model_name probe_folder.mkdir(exist_ok=True) - pt_metadata, _, _ = get_probe_metadata_from_probe_features(probe_features, model_name) - - num_shank = pt_metadata["num_shanks"] - contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] - if num_shank == 1: - elec_ids = np.arange(contact_per_shank) - shank_ids = None - else: - elec_ids = np.concatenate([np.arange(contact_per_shank) for i in range(num_shank)]) - shank_ids = np.concatenate([np.zeros(contact_per_shank) + i for i in range(num_shank)]) - - probe = _make_npx_probe_from_description(pt_metadata, model_name, elec_ids, shank_ids) + # Build the full probe with all possible contacts from the probe part number + probe = build_neuropixels_probe(model_name) # plotting fig, axs = plt.subplots(ncols=2) @@ -73,7 +65,10 @@ def generate_all_npx(base_folder=None): plot_probe(probe, ax=ax) ax.set_title("") - yp = pt_metadata["electrode_pitch_vert_um"] + # Get vertical pitch from contact positions (minimum non-zero y-distance) + positions = probe.contact_positions + y_positions = np.unique(positions[:, 1]) + yp = np.min(np.diff(y_positions)) ax.set_ylim(-yp*8, yp*13) ax.yaxis.set_visible(False) ax.spines["top"].set_visible(False) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 23d6531..3e7cd65 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -24,7 +24,7 @@ def _load_np_probe_features(): - # this avoid loading the json several time + # this avoid loading the json several times global _np_probe_features if _np_probe_features is None: probe_features_filepath = Path(__file__).absolute().parent / Path("resources/neuropixels_probe_features.json") @@ -349,6 +349,157 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh return probe +def build_neuropixels_probe(probe_part_number: str) -> Probe: + """ + Build a Neuropixels probe with all possible contacts from the probe part number. + + This function constructs a complete probe geometry based on IMEC manufacturer specifications + sourced from Bill Karsh's ProbeTable repository (https://github.com/billkarsh/ProbeTable). + The specifications include contact positions, electrode dimensions, shank geometry, MUX routing + tables, and ADC configurations. The resulting probe contains ALL electrodes (e.g., 960 for + NP1.0, 1280 for NP2.0), not just the subset that might be recorded in an actual experiment. + + Parameters + ---------- + probe_part_number : str + Probe part number (specific SKU identifier). + Examples: "NP1000", "NP2000", "NP1010", "NP2003", "NP2004" + + Note: This is the specific SKU, not the model/platform name: + - probe_part_number is like "NP1000" (specific SKU) + - NOT like "Neuropixels 1.0" or "Neuropixels 2.0" (platform family) + + In SpikeGLX meta files, this corresponds to the `imDatPrb_pn` field. + Multiple part numbers may belong to the same platform family but have + different configurations or variants. + + Returns + ------- + probe : Probe + The complete Probe object with all contacts and metadata. + """ + # ===== 1. Load configuration ===== + probe_features = _load_np_probe_features() + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + + # ===== 2. Calculate electrode IDs and shank IDs ===== + num_shanks = int(probe_spec_dict["num_shanks"]) + contacts_per_shank = int(probe_spec_dict["cols_per_shank"]) * int(probe_spec_dict["rows_per_shank"]) + + if num_shanks == 1: + elec_ids = np.arange(contacts_per_shank, dtype=int) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for _ in range(num_shanks)]) + shank_ids = np.concatenate([np.zeros(contacts_per_shank, dtype=int) + i for i in range(num_shanks)]) + + # ===== 3. Calculate contact positions ===== + cols_per_shank = int(probe_spec_dict["cols_per_shank"]) + y_idx, x_idx = np.divmod(elec_ids, cols_per_shank) + + x_pitch = float(probe_spec_dict["electrode_pitch_horz_um"]) + y_pitch = float(probe_spec_dict["electrode_pitch_vert_um"]) + + even_offset = float(probe_spec_dict["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + odd_offset = float(probe_spec_dict["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + raw_stagger = even_offset - odd_offset + + stagger = np.mod(y_idx + 1, 2) * raw_stagger + x_pos = (x_idx * x_pitch + stagger).astype("float64") + y_pos = (y_idx * y_pitch).astype("float64") + + # Apply horizontal offset for multi-shank probes + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + x_pos += np.array(shank_ids).astype(int) * shank_pitch + + positions = np.stack((x_pos, y_pos), axis=1) + + # ===== 4. Calculate contact IDs ===== + shank_ids_iter = shank_ids if shank_ids is not None else [None] * len(elec_ids) + contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids_iter, elec_ids) + ] + + # ===== 5. Create Probe object and set contacts ===== + probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") + probe.description = probe_spec_dict["description"] + probe.set_contacts( + positions=positions, + shapes="square", + shank_ids=shank_ids, + shape_params={"width": float(probe_spec_dict["electrode_size_horz_direction_um"])}, + ) + probe.set_contact_ids(contact_ids) + + # ===== 6. Build probe contour and shank tips ===== + shank_width = float(probe_spec_dict["shank_width_um"]) + tip_length = float(probe_spec_dict["tip_length_um"]) + polygon = np.array(get_probe_contour_vertices(shank_width, tip_length, get_probe_length(probe_part_number))) + + # Build contour for all shanks + contour = [] + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + contour += list(polygon + shank_shift) + + # Apply contour shift to align with contact positions + # This constant (11 μm) represents the vertical distance from the center of the bottommost + # electrode to the top of the shank tip. This is a geometric constant for Neuropixels probes + # that is not currently available in the ProbeTable specifications. + middle_of_bottommost_electrode_to_top_of_shank_tip = 11 + contour_shift = np.array([-odd_offset, -middle_of_bottommost_electrode_to_top_of_shank_tip]) + contour = np.array(contour) + contour_shift + probe.set_planar_contour(contour) + + # Calculate shank tips (polygon[2] is the tip vertex from get_probe_contour_vertices) + tip_vertex = polygon[2] + shank_tips = [] + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + shank_tip = np.array(tip_vertex) + contour_shift + shank_shift + shank_tips.append(shank_tip.tolist()) + probe.annotate(shank_tips=shank_tips) + + # ===== 7. Add metadata annotations ===== + probe.annotate( + adc_bit_depth=int(probe_spec_dict["adc_bit_depth"]), + num_readout_channels=int(probe_spec_dict["num_readout_channels"]), + ap_sample_frequency_hz=float(probe_spec_dict["ap_sample_frequency_hz"]), + lf_sample_frequency_hz=float(probe_spec_dict["lf_sample_frequency_hz"]), + ) + + # ===== 8. Add MUX table annotations ===== + mux_table_string = probe_features["z_mux_tables"][probe_spec_dict["mux_table_format_type"]] + if mux_table_string is not None: + # Parse MUX table string: (num_adcs,num_channels_per_adc)(int int ...)(int int ...)... + adc_info = mux_table_string.split(")(")[0] + split_mux = mux_table_string.split(")(")[1:] + num_adcs, num_channels_per_adc = map(int, adc_info[1:].split(",")) + adc_groups_list = [ + np.array(each_mux.replace("(", "").replace(")", "").split(" ")).astype("int") for each_mux in split_mux + ] + mux_table = np.transpose(np.array(adc_groups_list)) + + # Map contacts to ADC groups and sample order + num_contacts = positions.shape[0] + adc_groups = np.zeros(num_contacts, dtype="int64") + adc_sample_order = np.zeros(num_contacts, dtype="int64") + for adc_index, adc_groups_per_adc in enumerate(mux_table): + adc_groups_per_adc = adc_groups_per_adc[adc_groups_per_adc < num_contacts] + adc_groups[adc_groups_per_adc] = adc_index + adc_sample_order[adc_groups_per_adc] = np.arange(len(adc_groups_per_adc)) + + probe.annotate(num_adcs=num_adcs) + probe.annotate(num_channels_per_adc=num_channels_per_adc) + probe.annotate_contacts(adc_group=adc_groups) + probe.annotate_contacts(adc_sample_order=adc_sample_order) + + return probe + + def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: """ Parse the IMRO table when presented as a string and create a Probe object. @@ -542,23 +693,121 @@ def write_imro(file: str | Path, probe: Probe): ## -def read_spikeglx(file: str | Path) -> Probe: +def _build_canonical_contact_id(electrode_id: int, shank_id: int | None = None) -> str: + """ + Build the canonical contact ID string for a Neuropixels electrode. + + This establishes the standard naming convention used throughout probeinterface + for Neuropixels contact identification. + + Parameters + ---------- + electrode_id : int + Physical electrode ID on the probe (e.g., 0-959 for NP1.0) + shank_id : int or None, default: None + Shank ID for multi-shank probes. If None, assumes single-shank probe. + + Returns + ------- + contact_id : str + Canonical contact ID string, either "e{electrode_id}" for single-shank + or "s{shank_id}e{electrode_id}" for multi-shank probes. + + Examples + -------- + >>> _build_canonical_contact_id(123) + 'e123' + >>> _build_canonical_contact_id(123, shank_id=0) + 's0e123' + """ + if shank_id is not None: + return f"s{shank_id}e{electrode_id}" + else: + return f"e{electrode_id}" + + +def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: + """ + Parse IMRO (Imec ReadOut) table string into structured per-channel data. + + IMRO format: "(probe_type,num_chans)(ch0 bank0 ref0 ...)(ch1 bank1 ref1 ...)..." + Example: "(0,384)(0 1 0 500 250 1)(1 0 0 500 250 1)..." + + Note: The IMRO header contains a probe_type field (e.g., "0", "21", "24"), which is + a numeric format version identifier that specifies which IMRO table structure was used. + Different probe generations use different IMRO formats. This is a file format detail, + not a physical probe property. + + Parameters + ---------- + imro_table_string : str + IMRO table string from SpikeGLX metadata file + probe_part_number : str + Probe part number (e.g., "NP1000", "NP2000") + + Returns + ------- + imro_per_channel : dict + Dictionary where each key maps to a list of values (one per channel). + Keys are IMRO fields like "channel", "bank", "electrode", "ap_gain", etc. + The "electrode" key always contains physical electrode IDs (0-959 for NP1.0, etc.). + For NP2.0+: electrode IDs come directly from IMRO data. + For NP1.0: electrode IDs are computed as bank * 384 + channel. + Example: {"channel": [0,1,2,...], "bank": [1,0,0,...], "electrode": [384,1,2,...], "ap_gain": [500,500,...]} """ - Read probe position for the meta file generated by SpikeGLX + # Get IMRO field format from catalogue + probe_features = _load_np_probe_features() + probe_spec = probe_features["neuropixels_probes"][probe_part_number] + imro_format = probe_spec["imro_table_format_type"] + imro_fields_string = probe_features["z_imro_formats"][imro_format + "_elm_flds"] + imro_fields = tuple(imro_fields_string.replace("(", "").replace(")", "").split(" ")) + + # Parse IMRO table values into per-channel data + # Skip the header "(probe_type,num_chans)" and trailing empty string + _, *imro_table_values_list, _ = imro_table_string.strip().split(")") + imro_per_channel = {k: [] for k in imro_fields} + for field_values_str in imro_table_values_list: + values = tuple(map(int, field_values_str[1:].split(" "))) + for field, field_value in zip(imro_fields, values): + imro_per_channel[field].append(field_value) + + # Ensure "electrode" key always exists with physical electrode IDs + # Different probe types encode electrode selection differently + if "electrode" not in imro_per_channel: + # NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel) + readout_channel_ids = np.array(imro_per_channel["channel"]) + bank_key = "bank" if "bank" in imro_per_channel else "bank_mask" + bank_indices = np.array(imro_per_channel[bank_key]) + imro_per_channel["electrode"] = (bank_indices * 384 + readout_channel_ids).tolist() + + return imro_per_channel + - See http://billkarsh.github.io/SpikeGLX/#metadata-guides for implementation. - The x_pitch/y_pitch/width are set automatically depending on the NP version. +def read_spikeglx(file: str | Path) -> Probe: + """ + Read probe geometry and configuration from a SpikeGLX metadata file. - The shape is auto generated as a shank. + This function reconstructs the probe used in a recording by: + 1. Reading the probe part number from metadata + 2. Building the full probe geometry from manufacturer specifications + 3. Slicing to the electrodes selected in the IMRO table + 4. Further slicing to channels actually saved to disk (if subset was saved) + 5. Adding recording-specific annotations + 6. Add wiring (device channel indices) Parameters ---------- file : Path or str - The .meta file path + Path to the SpikeGLX .meta file Returns ------- - probe : Probe object + probe : Probe + Probe object with geometry, contact annotations, and device channel mapping + + See Also + -------- + http://billkarsh.github.io/SpikeGLX/#metadata-guides """ @@ -566,42 +815,73 @@ def read_spikeglx(file: str | Path) -> Probe: assert meta_file.suffix == ".meta", "'meta_file' should point to the .meta SpikeGLX file" meta = parse_spikeglx_meta(meta_file) - assert "imroTbl" in meta, "Could not find imroTbl field in meta file!" - imro_table = meta["imroTbl"] - - # read serial number - imDatPrb_serial_number = meta.get("imDatPrb_sn", None) - if imDatPrb_serial_number is None: # this is for Phase3A - imDatPrb_serial_number = meta.get("imProbeSN", None) - # read other metadata + # ===== 1. Extract probe part number from metadata ===== imDatPrb_pn = meta.get("imDatPrb_pn", None) - imDatPrb_port = meta.get("imDatPrb_port", None) - imDatPrb_slot = meta.get("imDatPrb_slot", None) - imDatPrb_part_number = meta.get("imDatPrb_pn", None) - - # Only Phase3a probe has "imProbeOpt". Map this to NP10101. + # Only Phase3a probe has "imProbeOpt". Map this to NP1010. if meta.get("imProbeOpt") is not None: imDatPrb_pn = "NP1010" - probe = _read_imro_string(imro_str=imro_table, imDatPrb_pn=imDatPrb_pn) + # ===== 2. Build full probe with all possible contacts ===== + # This creates the complete probe geometry (e.g., 960 contacts for NP1.0) + # based on manufacturer specifications + full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn) + + # ===== 3. Parse IMRO table to extract recorded electrodes and acquisition settings ===== + # IMRO = Imec ReadOut (the configuration table format from IMEC manufacturer) + # Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their + # acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/ + imro_table_string = meta["imroTbl"] + imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn) + + # ===== 4. Build contact IDs for active electrodes ===== + # Convert physical electrode IDs to probeinterface canonical contact ID strings + imro_electrode = imro_per_channel["electrode"] + imro_shank = imro_per_channel.get("shank", [None] * len(imro_electrode)) + active_contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(imro_shank, imro_electrode) + ] + + # ===== 5. Slice full probe to active electrodes ===== + # Find indices of active contacts in the full probe, preserving IMRO order + contact_id_to_index = {contact_id: idx for idx, contact_id in enumerate(full_probe.contact_ids)} + selected_contact_indices = np.array([contact_id_to_index[contact_id] for contact_id in active_contact_ids]) + + probe = full_probe.get_slice(selected_contact_indices) + + # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== + # Filter IMRO data to only the properties we want to add as annotations + imro_properties_to_add = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + imro_filtered = {k: v for k, v in imro_per_channel.items() if k in imro_properties_to_add and len(v) > 0} + # Map IMRO field names to probeinterface field names and add as contact annotations + annotations = {} + for imro_field, values in imro_filtered.items(): + pi_field = imro_field_to_pi_field.get(imro_field) + annotations[pi_field] = values + probe.annotate_contacts(**annotations) + + # ===== 7. Slice to saved channels (if subset was saved) ===== + # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, + # but SpikeGLX can optionally save only a subset of acquired channels to reduce file size. + # For example: IMRO selects 384 electrodes, but only 300 are saved to disk. + saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) + saved_chans = saved_chans[saved_chans < probe.get_contact_count()] # Remove SYS channels + if saved_chans.size != probe.get_contact_count(): + probe = probe.get_slice(saved_chans) - # add serial number and other annotations + # ===== 6. Add recording-specific annotations ===== + # These annotations identify the physical probe instance and recording setup + imDatPrb_serial_number = meta.get("imDatPrb_sn") or meta.get("imProbeSN") # Phase3A uses imProbeSN + imDatPrb_port = meta.get("imDatPrb_port", None) + imDatPrb_slot = meta.get("imDatPrb_slot", None) probe.annotate(serial_number=imDatPrb_serial_number) - probe.annotate(part_number=imDatPrb_part_number) + probe.annotate(part_number=imDatPrb_pn) probe.annotate(port=imDatPrb_port) probe.annotate(slot=imDatPrb_slot) - probe.annotate(serial_number=imDatPrb_serial_number) - # sometimes we need to slice the probe when not all channels are saved - saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) - # remove the SYS chans - saved_chans = saved_chans[saved_chans < probe.get_contact_count()] - if saved_chans.size != probe.get_contact_count(): - # slice if needed - probe = probe.get_slice(saved_chans) - # wire it + # ===== 7. Set device channel indices (wiring) ===== + # I am unsure why are we are doing this. If someone knows please document it here. probe.set_device_channel_indices(np.arange(probe.get_contact_count())) return probe