Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 245 additions & 1 deletion tidy3d/components/data/monitor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,74 @@ def _tangential_dims(self) -> list[str]:

return tangential_dims

@cached_property
def _yee_integration_sizes(self) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]:
"""Return primal (cell) and dual step sizes for Yee grid integration.

The sizes are clamped to the monitor bounds, so cells partially outside the monitor
have reduced sizes, and cells completely outside have size 0.

For Yee grid flux integration:
- E1×H2 term uses: cell_dim1 (primal) × dual_dim2
- E2×H1 term uses: dual_dim1 × cell_dim2 (primal)

Returns
-------
tuple[dict[str, np.ndarray], dict[str, np.ndarray]]
A tuple of (cell_sizes, dual_sizes) dictionaries with monitor-bounds-adjusted
step sizes for each tangential dimension.
"""

# cell_sizes = self.grid_expanded._primal_steps
# dual_sizes = self.grid_expanded._dual_steps

_, plane_inds = self.monitor.pop_axis([0, 1, 2], self.monitor.size.index(0.0))
dims = ["x", "y", "z"]
mnt_bounds = np.array(self.monitor.bounds)

cell_sizes = {}
dual_sizes = {}

for axis in plane_inds:
dim = dims[axis]
mnt_min = mnt_bounds[0, axis]
mnt_max = mnt_bounds[1, axis]
centers = self._tangential_fields["E" + dim].coords[dim].values
centers = np.concatenate([[mnt_min], centers])
boundaries = self._tangential_fields["H" + dim].coords[dim].values
boundaries = np.concatenate([boundaries, [mnt_max]])

# Dual sizes: distances between cell centers, clamped to monitor bounds
# Build coords array: [first_boundary, centers]
# Need an extract dummy value to get the correct length of dual_sizes, the
# first dual step is partially outside the monitor bounds.
dual_coords = np.clip(centers, mnt_min, mnt_max)
dual_sizes[dim] = dual_coords[1:] - dual_coords[:-1]

# Cell/primal sizes: distances between boundaries, clamped to monitor bounds
cell_coords = np.clip(boundaries, mnt_min, mnt_max)
cell_sizes[dim] = cell_coords[1:] - cell_coords[:-1]

# boundaries = self.grid_expanded.boundaries.to_dict[dim]
# centers = self.grid_expanded.centers.to_dict[dim]

# dim_idx = dims.index(dim)

# # Dual sizes: distances between cell centers, clamped to monitor bounds
# # Build coords array: [first_boundary, centers]
# # Need an extract dummy value to get the correct length of dual_sizes, the
# # first dual step is partially outside the monitor bounds.
# dual_coords = np.concatenate([[boundaries[0]], centers])
# dual_coords = np.clip(dual_coords, mnt_min, mnt_max)
# dual_sizes[dim] = dual_coords[1:] - dual_coords[:-1]

# # Cell/primal sizes: distances between boundaries, clamped to monitor bounds
# cell_coords = boundaries
# cell_coords = np.clip(cell_coords, mnt_min, mnt_max)
# cell_sizes[dim] = cell_coords[1:] - cell_coords[:-1]

return cell_sizes, dual_sizes

@property
def colocation_boundaries(self) -> Coords:
"""Coordinates to be used for colocation of the data to grid boundaries."""
Expand Down Expand Up @@ -696,7 +764,8 @@ def package_flux_results(self, flux_values: DataArray) -> Any:
@cached_property
def complex_flux(self) -> Union[FluxDataArray, FreqModeDataArray]:
"""Flux for data corresponding to a 2D monitor."""

if self.monitor.colocate is False:
return self._complex_flux_yee
# Compute flux by integrating Poynting vector in-plane
d_area = self._diff_area
poynting = self.complex_poynting
Expand All @@ -711,6 +780,48 @@ def flux(self) -> Union[FluxDataArray, FreqModeDataArray]:
"""Flux for data corresponding to a 2D monitor."""
return self.complex_flux.real

@cached_property
def _complex_flux_yee(self) -> FluxDataArray:
"""Flux for data corresponding to a 2D monitor."""

# Tangential fields are ordered as E1, E2, H1, H2
self._check_suitability_for_flux_yee()
tan_fields = self._tangential_fields
dim1, dim2 = self._tangential_dims
E1 = tan_fields["E" + dim1]
E2 = tan_fields["E" + dim2]
H1 = tan_fields["H" + dim1]
H2 = tan_fields["H" + dim2]

e_x_h_pos1 = E1 * H2.conj()
e_x_h_pos2 = E2 * H1.conj()

# Get boundary-adjusted cell sizes for proper integration weights
cell_sizes, dual_cell_sizes = self._yee_integration_sizes

# Create DataArrays for cell sizes - xarray broadcasts over other dims automatically
da_cell_dim1 = xr.DataArray(cell_sizes[dim1], dims=[dim1], coords={dim1: E1.coords[dim1]})
da_dual_dim1 = xr.DataArray(
dual_cell_sizes[dim1], dims=[dim1], coords={dim1: E2.coords[dim1]}
)
da_cell_dim2 = xr.DataArray(cell_sizes[dim2], dims=[dim2], coords={dim2: E2.coords[dim2]})
da_dual_dim2 = xr.DataArray(
dual_cell_sizes[dim2], dims=[dim2], coords={dim2: E1.coords[dim2]}
)

E1_H2_darea = da_cell_dim1 * da_dual_dim2
E2_H1_darea = da_dual_dim1 * da_cell_dim2

term1 = (e_x_h_pos1 * E1_H2_darea).sum(dim=[dim1, dim2])
term2 = -(e_x_h_pos2 * E2_H1_darea).sum(dim=[dim1, dim2])

return FluxDataArray((term1 + term2) / 2)

@cached_property
def _flux_yee(self) -> FluxDataArray:
"""Flux for data corresponding to a 2D monitor."""
return self._complex_flux_yee.real

@cached_property
def mode_area(self) -> FreqModeDataArray:
r"""Effective mode area corresponding to a 2D monitor.
Expand Down Expand Up @@ -767,6 +878,8 @@ def dot(
In the non-conjugated definition, modes are orthogonal, but the interpretation of the
dot product power carried by a given mode is no longer valid.
"""
if self.monitor.colocate is False and field_data.monitor.colocate is False:
return self._dot_yee(field_data, conjugate=conjugate)

# Tangential fields for current and other field data
fields_self = self._colocated_tangential_fields
Expand Down Expand Up @@ -820,6 +933,95 @@ def _tangential_fields_match_coords(self, coords: ArrayFloat2D) -> bool:
return False
return True

def _dot_yee(
self, field_data: Union[FieldData, ModeData, ModeSolverData], conjugate: bool = True
) -> ModeAmpsDataArray:
r"""Dot product (modal overlap) with another :class:`.FieldData` object. Both datasets have
to be frequency-domain data associated with a 2D monitor. Along the tangential directions,
the datasets have to have the same discretization. Along the normal direction, the monitor
position may differ and is ignored. Other coordinates (``frequency``, ``mode_index``) have
to be either identical or broadcastable. Broadcasting is also supported in the case in
which the other ``field_data`` has a dimension of size 1 whose coordinate is not in the list
of coordinates in the ``self`` dataset along the corresponding dimension. In that case, the
coordinates of the ``self`` dataset are used in the output.

The dot product is defined as:

.. math:

\frac{1}{4} \int \left( E_0 \times H_1^* + H_0^* \times E_1 \) \, {\rm d}S

Parameters
----------
field_data : :class:`ElectromagneticFieldData`
A data instance to compute the dot product with.
conjugate : bool, optional
If ``True`` (default), the dot product is defined as above. If ``False``, the definition
is similar, but without the complex conjugation of the $H$ fields.

Note
----
The dot product with and without conjugation is equivalent (up to a phase) for
modes in lossless waveguides but differs for modes in lossy materials. In that case,
the conjugated dot product can be interpreted as the fraction of the power of the first
mode carried by the second, but modes are not orthogonal with respect to that product
and the sum of carried power fractions may be different from the total flux.
In the non-conjugated definition, modes are orthogonal, but the interpretation of the
dot product power carried by a given mode is no longer valid.
"""

# Tangential fields for current and other field data
# fields_self = self._colocated_tangential_fields
fields_self = self._tangential_fields
# fields_self = {key: field.isel(z=0, drop=True) for key, field in self.field_components.items()}

if conjugate:
fields_self = {key: field.conj() for key, field in fields_self.items()}

# fields_other = field_data._interpolated_tangential_fields(self._plane_grid_boundaries)
fields_other = field_data._tangential_fields

# Drop size-1 dimensions in the other data
fields_other = {key: field.squeeze(drop=True) for key, field in fields_other.items()}

# Cross products of fields
dim1, dim2 = self._tangential_dims

E1xH2 = (
fields_self["E" + dim1] * fields_other["H" + dim2]
+ fields_self["H" + dim2] * fields_other["E" + dim1]
)
E2xH1 = (
fields_self["E" + dim2] * fields_other["H" + dim1]
+ fields_self["H" + dim1] * fields_other["E" + dim2]
)

# Get boundary-adjusted cell sizes for proper integration weights
cell_sizes, dual_cell_sizes = self._yee_integration_sizes

# Create DataArrays for cell sizes - xarray broadcasts over other dims automatically
# E1xH2 uses: cell_dim1 (E1 coord) × dual_dim2 (H2 coord, same as E1)
da_cell_dim1 = xr.DataArray(
cell_sizes[dim1], dims=[dim1], coords={dim1: fields_self["E" + dim1].coords[dim1]}
)
da_dual_dim2 = xr.DataArray(
dual_cell_sizes[dim2], dims=[dim2], coords={dim2: fields_self["E" + dim1].coords[dim2]}
)
# E2xH1 uses: dual_dim1 (E2 coord) × cell_dim2 (H1 coord, same as E2)
da_dual_dim1 = xr.DataArray(
dual_cell_sizes[dim1], dims=[dim1], coords={dim1: fields_self["E" + dim2].coords[dim1]}
)
da_cell_dim2 = xr.DataArray(
cell_sizes[dim2], dims=[dim2], coords={dim2: fields_self["E" + dim2].coords[dim2]}
)

E1_H2_dS = da_cell_dim1 * da_dual_dim2
E2_H1_dS = da_dual_dim1 * da_cell_dim2

term1 = (E1xH2 * E1_H2_dS).sum(dim=[dim1, dim2])
term2 = -(E2xH1 * E2_H1_dS).sum(dim=[dim1, dim2])
return ModeAmpsDataArray(0.25 * (term1 + term2))

def _interpolated_tangential_fields(self, coords: ArrayFloat2D) -> dict[str, DataArray]:
"""For 2D monitors, interpolate this fields to given coords in the tangential plane.

Expand Down Expand Up @@ -1309,6 +1511,48 @@ def _interpolated_copies_if_needed(
other_copy = other.interpolated_copy if isinstance(other, ModeSolverData) else other
return self_copy, other_copy

def _check_suitability_for_flux_yee(self):
"""Check if fields come from uncolocated type monitor and can be used with flux_yee."""
# Tangential fields are ordered as E1, E2, H1, H2
dim1, dim2 = self._tangential_dims
E1 = self._tangential_fields["E" + dim1]
E2 = self._tangential_fields["E" + dim2]
H1 = self._tangential_fields["H" + dim1]
H2 = self._tangential_fields["H" + dim2]

# Check that E1/H2 and E2/H1 coordinates match
eh_1_match = all(E1.coords[c].equals(H2.coords[c]) for c in self._tangential_dims)
eh_2_match = all(E2.coords[c].equals(H1.coords[c]) for c in self._tangential_dims)
if not (eh_1_match and eh_2_match):
raise ValueError("Cannot use '_flux_yee' on the provided field data.")

# Check that field array lengths match the grid sizes
primal_sizes, dual_sizes = self._yee_integration_sizes

# E1 uses primal in dim1, dual in dim2
if len(E1.coords[dim1]) != len(primal_sizes[dim1]):
raise ValueError(
f"Field coordinate length ({len(E1.coords[dim1])}) does not match "
f"grid primal size length ({len(primal_sizes[dim1])}) in dimension '{dim1}'."
)
if len(E1.coords[dim2]) != len(dual_sizes[dim2]):
raise ValueError(
f"Field coordinate length ({len(E1.coords[dim2])}) does not match "
f"grid dual size length ({len(dual_sizes[dim2])}) in dimension '{dim2}'."
)

# E2 uses dual in dim1, primal in dim2
if len(E2.coords[dim1]) != len(dual_sizes[dim1]):
raise ValueError(
f"Field coordinate length ({len(E2.coords[dim1])}) does not match "
f"grid dual size length ({len(dual_sizes[dim1])}) in dimension '{dim1}'."
)
if len(E2.coords[dim2]) != len(primal_sizes[dim2]):
raise ValueError(
f"Field coordinate length ({len(E2.coords[dim2])}) does not match "
f"grid primal size length ({len(primal_sizes[dim2])}) in dimension '{dim2}'."
)


class FieldData(FieldDataset, ElectromagneticFieldData):
"""
Expand Down
8 changes: 8 additions & 0 deletions tidy3d/components/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,14 @@ class SurfaceIntegrationMonitor(Monitor, ABC):
description="Surfaces to exclude in the integration, if a volume monitor.",
)

colocate: bool = pydantic.Field(
True,
title="Colocate Fields",
description="Defines whether fields are colocated to grid cell boundaries (i.e. to the "
"primal grid). Can be toggled for field recording monitors and is hard-coded for other "
"monitors depending on their specific function.",
)

@property
def integration_surfaces(self):
"""Surfaces of the monitor where fields will be recorded for subsequent integration."""
Expand Down