diff --git a/tidy3d/components/data/monitor_data.py b/tidy3d/components/data/monitor_data.py index b2bda4e653..cf63616464 100644 --- a/tidy3d/components/data/monitor_data.py +++ b/tidy3d/components/data/monitor_data.py @@ -450,6 +450,74 @@ def _tangential_dims(self) -> list[str]: return tangential_dims + @cached_property + def _yee_integration_sizes(self) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]: + """Return primal (cell) and dual step sizes for Yee grid integration. + + The sizes are clamped to the monitor bounds, so cells partially outside the monitor + have reduced sizes, and cells completely outside have size 0. + + For Yee grid flux integration: + - E1×H2 term uses: cell_dim1 (primal) × dual_dim2 + - E2×H1 term uses: dual_dim1 × cell_dim2 (primal) + + Returns + ------- + tuple[dict[str, np.ndarray], dict[str, np.ndarray]] + A tuple of (cell_sizes, dual_sizes) dictionaries with monitor-bounds-adjusted + step sizes for each tangential dimension. + """ + + # cell_sizes = self.grid_expanded._primal_steps + # dual_sizes = self.grid_expanded._dual_steps + + _, plane_inds = self.monitor.pop_axis([0, 1, 2], self.monitor.size.index(0.0)) + dims = ["x", "y", "z"] + mnt_bounds = np.array(self.monitor.bounds) + + cell_sizes = {} + dual_sizes = {} + + for axis in plane_inds: + dim = dims[axis] + mnt_min = mnt_bounds[0, axis] + mnt_max = mnt_bounds[1, axis] + centers = self._tangential_fields["E" + dim].coords[dim].values + centers = np.concatenate([[mnt_min], centers]) + boundaries = self._tangential_fields["H" + dim].coords[dim].values + boundaries = np.concatenate([boundaries, [mnt_max]]) + + # Dual sizes: distances between cell centers, clamped to monitor bounds + # Build coords array: [first_boundary, centers] + # Need an extract dummy value to get the correct length of dual_sizes, the + # first dual step is partially outside the monitor bounds. + dual_coords = np.clip(centers, mnt_min, mnt_max) + dual_sizes[dim] = dual_coords[1:] - dual_coords[:-1] + + # Cell/primal sizes: distances between boundaries, clamped to monitor bounds + cell_coords = np.clip(boundaries, mnt_min, mnt_max) + cell_sizes[dim] = cell_coords[1:] - cell_coords[:-1] + + # boundaries = self.grid_expanded.boundaries.to_dict[dim] + # centers = self.grid_expanded.centers.to_dict[dim] + + # dim_idx = dims.index(dim) + + # # Dual sizes: distances between cell centers, clamped to monitor bounds + # # Build coords array: [first_boundary, centers] + # # Need an extract dummy value to get the correct length of dual_sizes, the + # # first dual step is partially outside the monitor bounds. + # dual_coords = np.concatenate([[boundaries[0]], centers]) + # dual_coords = np.clip(dual_coords, mnt_min, mnt_max) + # dual_sizes[dim] = dual_coords[1:] - dual_coords[:-1] + + # # Cell/primal sizes: distances between boundaries, clamped to monitor bounds + # cell_coords = boundaries + # cell_coords = np.clip(cell_coords, mnt_min, mnt_max) + # cell_sizes[dim] = cell_coords[1:] - cell_coords[:-1] + + return cell_sizes, dual_sizes + @property def colocation_boundaries(self) -> Coords: """Coordinates to be used for colocation of the data to grid boundaries.""" @@ -696,7 +764,8 @@ def package_flux_results(self, flux_values: DataArray) -> Any: @cached_property def complex_flux(self) -> Union[FluxDataArray, FreqModeDataArray]: """Flux for data corresponding to a 2D monitor.""" - + if self.monitor.colocate is False: + return self._complex_flux_yee # Compute flux by integrating Poynting vector in-plane d_area = self._diff_area poynting = self.complex_poynting @@ -711,6 +780,48 @@ def flux(self) -> Union[FluxDataArray, FreqModeDataArray]: """Flux for data corresponding to a 2D monitor.""" return self.complex_flux.real + @cached_property + def _complex_flux_yee(self) -> FluxDataArray: + """Flux for data corresponding to a 2D monitor.""" + + # Tangential fields are ordered as E1, E2, H1, H2 + self._check_suitability_for_flux_yee() + tan_fields = self._tangential_fields + dim1, dim2 = self._tangential_dims + E1 = tan_fields["E" + dim1] + E2 = tan_fields["E" + dim2] + H1 = tan_fields["H" + dim1] + H2 = tan_fields["H" + dim2] + + e_x_h_pos1 = E1 * H2.conj() + e_x_h_pos2 = E2 * H1.conj() + + # Get boundary-adjusted cell sizes for proper integration weights + cell_sizes, dual_cell_sizes = self._yee_integration_sizes + + # Create DataArrays for cell sizes - xarray broadcasts over other dims automatically + da_cell_dim1 = xr.DataArray(cell_sizes[dim1], dims=[dim1], coords={dim1: E1.coords[dim1]}) + da_dual_dim1 = xr.DataArray( + dual_cell_sizes[dim1], dims=[dim1], coords={dim1: E2.coords[dim1]} + ) + da_cell_dim2 = xr.DataArray(cell_sizes[dim2], dims=[dim2], coords={dim2: E2.coords[dim2]}) + da_dual_dim2 = xr.DataArray( + dual_cell_sizes[dim2], dims=[dim2], coords={dim2: E1.coords[dim2]} + ) + + E1_H2_darea = da_cell_dim1 * da_dual_dim2 + E2_H1_darea = da_dual_dim1 * da_cell_dim2 + + term1 = (e_x_h_pos1 * E1_H2_darea).sum(dim=[dim1, dim2]) + term2 = -(e_x_h_pos2 * E2_H1_darea).sum(dim=[dim1, dim2]) + + return FluxDataArray((term1 + term2) / 2) + + @cached_property + def _flux_yee(self) -> FluxDataArray: + """Flux for data corresponding to a 2D monitor.""" + return self._complex_flux_yee.real + @cached_property def mode_area(self) -> FreqModeDataArray: r"""Effective mode area corresponding to a 2D monitor. @@ -767,6 +878,8 @@ def dot( In the non-conjugated definition, modes are orthogonal, but the interpretation of the dot product power carried by a given mode is no longer valid. """ + if self.monitor.colocate is False and field_data.monitor.colocate is False: + return self._dot_yee(field_data, conjugate=conjugate) # Tangential fields for current and other field data fields_self = self._colocated_tangential_fields @@ -820,6 +933,95 @@ def _tangential_fields_match_coords(self, coords: ArrayFloat2D) -> bool: return False return True + def _dot_yee( + self, field_data: Union[FieldData, ModeData, ModeSolverData], conjugate: bool = True + ) -> ModeAmpsDataArray: + r"""Dot product (modal overlap) with another :class:`.FieldData` object. Both datasets have + to be frequency-domain data associated with a 2D monitor. Along the tangential directions, + the datasets have to have the same discretization. Along the normal direction, the monitor + position may differ and is ignored. Other coordinates (``frequency``, ``mode_index``) have + to be either identical or broadcastable. Broadcasting is also supported in the case in + which the other ``field_data`` has a dimension of size 1 whose coordinate is not in the list + of coordinates in the ``self`` dataset along the corresponding dimension. In that case, the + coordinates of the ``self`` dataset are used in the output. + + The dot product is defined as: + + .. math: + + \frac{1}{4} \int \left( E_0 \times H_1^* + H_0^* \times E_1 \) \, {\rm d}S + + Parameters + ---------- + field_data : :class:`ElectromagneticFieldData` + A data instance to compute the dot product with. + conjugate : bool, optional + If ``True`` (default), the dot product is defined as above. If ``False``, the definition + is similar, but without the complex conjugation of the $H$ fields. + + Note + ---- + The dot product with and without conjugation is equivalent (up to a phase) for + modes in lossless waveguides but differs for modes in lossy materials. In that case, + the conjugated dot product can be interpreted as the fraction of the power of the first + mode carried by the second, but modes are not orthogonal with respect to that product + and the sum of carried power fractions may be different from the total flux. + In the non-conjugated definition, modes are orthogonal, but the interpretation of the + dot product power carried by a given mode is no longer valid. + """ + + # Tangential fields for current and other field data + # fields_self = self._colocated_tangential_fields + fields_self = self._tangential_fields + # fields_self = {key: field.isel(z=0, drop=True) for key, field in self.field_components.items()} + + if conjugate: + fields_self = {key: field.conj() for key, field in fields_self.items()} + + # fields_other = field_data._interpolated_tangential_fields(self._plane_grid_boundaries) + fields_other = field_data._tangential_fields + + # Drop size-1 dimensions in the other data + fields_other = {key: field.squeeze(drop=True) for key, field in fields_other.items()} + + # Cross products of fields + dim1, dim2 = self._tangential_dims + + E1xH2 = ( + fields_self["E" + dim1] * fields_other["H" + dim2] + + fields_self["H" + dim2] * fields_other["E" + dim1] + ) + E2xH1 = ( + fields_self["E" + dim2] * fields_other["H" + dim1] + + fields_self["H" + dim1] * fields_other["E" + dim2] + ) + + # Get boundary-adjusted cell sizes for proper integration weights + cell_sizes, dual_cell_sizes = self._yee_integration_sizes + + # Create DataArrays for cell sizes - xarray broadcasts over other dims automatically + # E1xH2 uses: cell_dim1 (E1 coord) × dual_dim2 (H2 coord, same as E1) + da_cell_dim1 = xr.DataArray( + cell_sizes[dim1], dims=[dim1], coords={dim1: fields_self["E" + dim1].coords[dim1]} + ) + da_dual_dim2 = xr.DataArray( + dual_cell_sizes[dim2], dims=[dim2], coords={dim2: fields_self["E" + dim1].coords[dim2]} + ) + # E2xH1 uses: dual_dim1 (E2 coord) × cell_dim2 (H1 coord, same as E2) + da_dual_dim1 = xr.DataArray( + dual_cell_sizes[dim1], dims=[dim1], coords={dim1: fields_self["E" + dim2].coords[dim1]} + ) + da_cell_dim2 = xr.DataArray( + cell_sizes[dim2], dims=[dim2], coords={dim2: fields_self["E" + dim2].coords[dim2]} + ) + + E1_H2_dS = da_cell_dim1 * da_dual_dim2 + E2_H1_dS = da_dual_dim1 * da_cell_dim2 + + term1 = (E1xH2 * E1_H2_dS).sum(dim=[dim1, dim2]) + term2 = -(E2xH1 * E2_H1_dS).sum(dim=[dim1, dim2]) + return ModeAmpsDataArray(0.25 * (term1 + term2)) + def _interpolated_tangential_fields(self, coords: ArrayFloat2D) -> dict[str, DataArray]: """For 2D monitors, interpolate this fields to given coords in the tangential plane. @@ -1309,6 +1511,48 @@ def _interpolated_copies_if_needed( other_copy = other.interpolated_copy if isinstance(other, ModeSolverData) else other return self_copy, other_copy + def _check_suitability_for_flux_yee(self): + """Check if fields come from uncolocated type monitor and can be used with flux_yee.""" + # Tangential fields are ordered as E1, E2, H1, H2 + dim1, dim2 = self._tangential_dims + E1 = self._tangential_fields["E" + dim1] + E2 = self._tangential_fields["E" + dim2] + H1 = self._tangential_fields["H" + dim1] + H2 = self._tangential_fields["H" + dim2] + + # Check that E1/H2 and E2/H1 coordinates match + eh_1_match = all(E1.coords[c].equals(H2.coords[c]) for c in self._tangential_dims) + eh_2_match = all(E2.coords[c].equals(H1.coords[c]) for c in self._tangential_dims) + if not (eh_1_match and eh_2_match): + raise ValueError("Cannot use '_flux_yee' on the provided field data.") + + # Check that field array lengths match the grid sizes + primal_sizes, dual_sizes = self._yee_integration_sizes + + # E1 uses primal in dim1, dual in dim2 + if len(E1.coords[dim1]) != len(primal_sizes[dim1]): + raise ValueError( + f"Field coordinate length ({len(E1.coords[dim1])}) does not match " + f"grid primal size length ({len(primal_sizes[dim1])}) in dimension '{dim1}'." + ) + if len(E1.coords[dim2]) != len(dual_sizes[dim2]): + raise ValueError( + f"Field coordinate length ({len(E1.coords[dim2])}) does not match " + f"grid dual size length ({len(dual_sizes[dim2])}) in dimension '{dim2}'." + ) + + # E2 uses dual in dim1, primal in dim2 + if len(E2.coords[dim1]) != len(dual_sizes[dim1]): + raise ValueError( + f"Field coordinate length ({len(E2.coords[dim1])}) does not match " + f"grid dual size length ({len(dual_sizes[dim1])}) in dimension '{dim1}'." + ) + if len(E2.coords[dim2]) != len(primal_sizes[dim2]): + raise ValueError( + f"Field coordinate length ({len(E2.coords[dim2])}) does not match " + f"grid primal size length ({len(primal_sizes[dim2])}) in dimension '{dim2}'." + ) + class FieldData(FieldDataset, ElectromagneticFieldData): """ diff --git a/tidy3d/components/monitor.py b/tidy3d/components/monitor.py index df4ec33049..b3290e0d3b 100644 --- a/tidy3d/components/monitor.py +++ b/tidy3d/components/monitor.py @@ -658,6 +658,14 @@ class SurfaceIntegrationMonitor(Monitor, ABC): description="Surfaces to exclude in the integration, if a volume monitor.", ) + colocate: bool = pydantic.Field( + True, + title="Colocate Fields", + description="Defines whether fields are colocated to grid cell boundaries (i.e. to the " + "primal grid). Can be toggled for field recording monitors and is hard-coded for other " + "monitors depending on their specific function.", + ) + @property def integration_surfaces(self): """Surfaces of the monitor where fields will be recorded for subsequent integration."""