adforce package

Submodules

adforce.ani module

Animate the outputs and inputs of ADCIRC simulations.

TODO: There is a lot of repeated code in the plotting functions. This should be refactored.

adforce.ani.ani_heights(path_in='mult1', step_size=1, add_name='', bbox=[('Latitude bounds', [28.5, 30.8], 'degrees_north'), ('Longitude bounds', [-92, -86.5], 'degrees_east'), 'New Orleans Area Bounding Box'])

Plot heights.

Parameters:
  • path_in (str, optional) – name of data folder. Defaults to “mult1”.

  • step_size (int, optional) – step size. Defaults to 1.

  • add_name (str, optional) – additional prefix name. Defaults to “”.

  • bbox (BoundingBox, optional) – bounding box. Defaults to NO_BBOX.

Return type:

None

adforce.ani.ani_heights_and_winds(path_in='mult1', step_size=1, add_name='', bbox=[('Latitude bounds', [28.5, 30.8], 'degrees_north'), ('Longitude bounds', [-92, -86.5], 'degrees_east'), 'New Orleans Area Bounding Box'], x_pos=0.9, y_pos=1.05, scale=800, coarsen=2)

Plot heights and winds.

Assumes that the heights on the mesh are in the fort.63.nc file and the winds on the grid are in the fort.22.nc file.

Equally we might want to plot the winds on the mesh instead. This might get rid of some of the time synchronization issues.

Parameters:
  • path_in (str, optional) – name of data folder. Defaults to “mult1”.

  • step_size (int, optional) – step size. Defaults to 1.

  • add_name (str, optional) – additional prefix name. Defaults to “”.

  • bbox (BoundingBox, optional) – bounding box. Defaults to NO_BBOX.

  • x_pos (float, optional) – relative x position of quiver label.

  • y_pos (float, optional) – relative y position of quiver label.

Return type:

None

adforce.ani.find_starttime_of_impact(da)

Helper function to find first nonzero time in timeseries.

Parameters:

da (xr.DataArray) – timeseries dataarray.

Returns:

start time of nonzeros.

Return type:

any

adforce.ani.line_up_f22(f22_main_ds, path_in, point)

Line up fort.22.

Parameters:
  • f22_main_ds (xr.Dataset) – main fort.22.nc dataset.

  • path_in (str) – path for inputs.

  • point (Point) – point to compare at.

Returns:

f22_main_ds.

Return type:

xr.Dataset

adforce.ani.plot_locations(path, ax, transform=None)

Plot the attempted and actual observation locations.

Parameters:
  • path (str) – path the config.yaml file is in.

  • ax (plt.Axes) – axes to plot on.

  • transform (optional) – transform to use for the plot. Defaults to None.

Return type:

None

adforce.ani.plot_trajectory(path, ax, transform=None)

Plot the trajectory of the tropical cyclone.

Parameters:
  • path (str) – path the config.yaml file is in.

  • ax (plt.Axes) – axes to plot on.

  • transform (optional) – transform to use for the plot. Defaults to None.

Return type:

None

adforce.ani.plot_u10_windx_at_a_point(path_in, point, plot=True)

Plot U10 at a point so that we can try to sync up the fort.22 and fort.74 files time coordinate.

Parameters:
  • path_in (str) – path in.

  • point (Point) – point to compare at (nearest mesh point to).

  • plot (bool, optional) – Defaults to True.

Return type:

None

adforce.ani.regenerate_fort22_if_does_not_exist(path)

Regenerate fort.22 if it does not exist.

Parameters:

path (str) – path to fort.22.nc.

Return type:

None

adforce.ani.run_animation()

Run the animation based on command line arguments.

Return type:

None

adforce.ani.single_wind_and_height_step(path_in='/work/n01/n01/sithom/adcirc-swan/tcpips/exp/new-orleans-2015/exp_0049', time_i=380, coarsen=2, bbox=[('Latitude bounds', [28.5, 30.8], 'degrees_north'), ('Longitude bounds', [-92, -86.5], 'degrees_east'), 'New Orleans Area Bounding Box'], x_pos=0.4, y_pos=1.05, scale=800, plot_loc=False, figure_name='snapshot.pdf')

Plot a single snapshot of height and wind.

Parameters:
  • path_in (str, optional) – path to data. Defaults to “/work/n01/n01/sithom/adcirc-swan/tcpips/exp/new-orleans-2015/exp_0049”.

  • time_i (Optional[int], optional) – time index. Defaults to 380. If None, we find the maximum maxele for the observation location.

  • coarsen (int, optional) – coarsen the wind data by this factor. Defaults to 2.

  • bbox (Optional[BoundingBox], optional) – bounding box. Defaults to NO_BBOX.

  • x_pos (float, optional) – relative x position of quiver label. Defaults to 0.9.

  • y_pos (float, optional) – relative y position of quiver label. Defaults to 1.05.

  • scale (float, optional) – scale of the quiver plot. Defaults to 800.

  • plot_loc (bool, optional) – whether to plot the locations. Defaults to False.

  • figure_name (str, optional) – name of the figure. Defaults to “snapshot.pdf”.

Return type:

None

adforce.boundaries module

Script to explore the boundaries of the ADCIRC mesh.

adforce.boundaries.extract_elevation_boundary_edges(netcdf_path, boundary_type=0)

Extracts the edges corresponding to elevation specified boundaries from an ADCIRC NetCDF file (like fort.63.nc or maxele.63.nc).

Parameters:
  • netcdf_path (str) – Path to the ADCIRC NetCDF file.

  • boundary_type (Optional[int]) – The specific elevation boundary type (from ‘ibtypee’) to extract. If None, extracts edges for all elevation boundary types. Defaults to 0 based on user’s data.

Returns:

An array of boundary edges, shape (num_BC_edges, 2),

containing pairs of 0-based node indices. Returns an empty array if no relevant boundaries are found.

Return type:

np.ndarray

Raises:
  • FileNotFoundError – If the netcdf_path does not exist.

  • KeyError – If required variables (‘nbdv’, ‘nvdll’, ‘ibtypee’) are missing from the NetCDF file.

Doctests:
>>> # Create a dummy NetCDF file for testing
>>> dummy_path = "dummy_adcirc_boundary.nc"
>>> with nc.Dataset(dummy_path, 'w') as ds:
...     _ = ds.createDimension('node', 5)
...     _ = ds.createDimension('neta', 4) # Total nodes in elevation boundaries
...     _ = ds.createDimension('nope', 1) # Number of elevation boundary segments
...     # Node indices (1-based in file, adjusted to 0-based: 0, 1, 3, 4)
...     nbdv_var = ds.createVariable('nbdv', 'i4', ('neta',))
...     nbdv_var[:] = np.array([1, 2, 4, 5])
...     # Segment lengths (one segment with 4 nodes)
...     nvdll_var = ds.createVariable('nvdll', 'i4', ('nope',))
...     nvdll_var[:] = np.array([4])
...     # Segment types (one segment of type 0)
...     ibtypee_var = ds.createVariable('ibtypee', 'i4', ('nope',))
...     ibtypee_var[:] = np.array([0])
>>> extract_elevation_boundary_edges(dummy_path, boundary_type=0)
masked_array(
data=[[0, 1],
       [1, 3],
       [3, 4]],
mask=False,
fill_value=999999,
dtype=int32)
>>> # Test with type mismatch
>>> extract_elevation_boundary_edges(dummy_path, boundary_type=1)
array([], shape=(0, 2), dtype=int64)
>>> # Test with multiple segments (requires adjusting dimensions/data)
>>> # Cleanup dummy file
>>> os.remove(dummy_path)
adforce.boundaries.find_boundaries_in_fort63(path='/home/docs/checkouts/readthedocs.org/user_builds/worstsurge/checkouts/latest/data/fort.63.nc', typ='medium')

Plot the boundaries in the ADCIRC mesh from a fort.63 netCDF file.

Parameters:
  • path (str) – Path to the fort.63 netCDF file. Defaults to FORT63_EXAMPLE.

  • typ (str) – Type of mesh for labeling purposes. Defaults to “medium”.

adforce.check_training_runs module

Let’s go through the training ADICRC runs and check if they were sucessful.

# TODO: add additional checks for - maxele values (find maxele, if too high, mark as failed) - check if output files are generated (fort.63, fort.64, fort.73, fort.74)

adforce.check_training_runs.status_list(parent_dir, save_to=None)

Check the status of ADCIRC training runs in the given parent directory.

Parameters:
  • parent_dir (str) – Path to the parent directory containing run subdirectories.

  • save_to (Optional[str], optional) – Path to save the data to if successful an if a directory is given. Defaults to None.

Returns:

A list of booleans indicating success (True) or failure (False) for each run.

Return type:

List[bool]

adforce.config module

A file to store the configuration formatters for the adforce package.

We use hydra to manage the configuration files. This file contains the dataclass that will be used to store the configuration parameters.

# TODO: Implement the configuration dataclass for the adforce package.

adforce.config.ChavasCollision

A dataclass to store the impact location of the Chavas Lin and Emanuel (2015) profile collision on a straight track.

class adforce.config.ChavasCollision(impact_lon, impact_lat)

Bases: object

impact_lat: float
impact_lon: float
adforce.config.load_config(path)

Load the configuration file for a wrapped ADCIRC run.

Parameters:

path (str) – path to the config file.

Returns:

loaded configuration.

Return type:

DictConfig

adforce.config.save_config(cfg)

Save the configuration file for a wrapped ADCIRC run.

Parameters:

cfg (DictConfig) – configuration.

Return type:

None

adforce.constants module

Constants for adforce package.

adforce.dual_graph module

Dual graph plotting module.

Dual graph functions in .mesh currently

adforce.dual_graph.make_dual_graph_nc()

Make full example dual graph netcdf file.

Return type:

None

adforce.dual_graph.plot_dual_graph()

Some test plots for the dual graph.

Return type:

None

adforce.dual_graph.plot_swegnn_ghost_cells(path_in, output_path)

Calculates SWE-GNN data on the fly and plots components to verify ghost cells.

This plot shows: 1. The original triangular mesh (faint gray). 2. The dual graph face centers and edges (faint blue). 3. The original mesh boundary edges (thick green). 4. The real boundary faces (red dots). 5. The calculated ghost faces (cyan dots). 6. The mirroring lines connecting boundary faces to ghost faces (dashed magenta). 7. The calculated ghost nodes (orange ‘x’).

Parameters:
  • path_in (str) – Path to the directory containing fort.*.nc files (e.g., …/exp_0049).

  • output_path (str) – Path to save the output plot (e.g., …/figure.pdf).

Return type:

None

adforce.dual_graph.test_dual_graph()

Test dual graph maker with a simple hexagon of triangles.

Return type:

None

adforce.fort22 module

fort.22.nc file creation functions.

adforce.fort22.add_psfc_u10(ds, tc_config=None, background_pressure=1010, v_reduc=0.8, geoid='sphere')

Add pressure and velocity fields to an existing netcdf dataset.

Parameters:
  • ds (nc.Dataset) – reference to input netcdf4 dataset.

  • tc_config (Optional[dict], optional) – A dictionary containing the information necesary to reconstruct the tropical cyclone trajectory. Defaults to None. If None, the fields are left blank (velocity fields are zero and pressure is set to background_pressure).

  • background_pressure (float) – background pressure to assume in mb. Defaults to 1010 mb.

Returns:

reference to transformed netcdf4 dataset.

Return type:

ds (nc.Dataset)

If tc_config[‘use_lc12’][‘value’] is True, a uniform background wind following Lin & Chavas (2012) is superposed on the symmetric field using parameters ‘translation_speed_factor’ and ‘rotation_angle’.

adforce.fort22.clon_clat_from_config_and_times(cfg, itime, times, geoid='sphere')

Get the center of the storm from the config and times.

Parameters:
  • cfg (dict) – A dictionary containing the information necesary to reconstruct the tropical cyclone trajectory.

  • itime (int) – time in minutes from start of calendar for impacts.

  • times (np.ndarray) – times in minutes from start of calendar.

Returns:

center of the storm at each time step.

Return type:

clons, clats (np.ndarray, np.ndarray)

adforce.fort22.create_fort22(nc_path, grid_config, tc_config)

Create a blank fort.22.nc file with the specified grid and TC configurations.

Parameters:
  • nc_path (str) – Path to the netCDF4 file.

  • grid_config (dict) – Grid configuration dictionary.

  • tc_config (dict) – Idealized TC configuration dictionary.

Return type:

None

adforce.fort22.gen_ps_f(profile_path_or_dict='/home/docs/checkouts/readthedocs.org/user_builds/worstsurge/checkouts/latest/w22/data/outputs.json')

Generate the interpolation function from an azimuthally symetric windprofile. (Gradient winds)

This should potentially be changed to allow each timestep to have a different profile.

That would require some clever way of interpolating each time step.

Parameters:

profile_path_or_dict (Union[str, dict], optional) – path to wind profile file or dictionary containing the wind profile. Defaults to os.path.join(CLE_DATA_PATH, “outputs.json”).

Returns:

interpolation function.

Return type:

Callable[[np.ndarray], Tuple[np.ndarray, np.ndarray]]

adforce.fort22.moving_rectilinear_square(ds, grid_config, tc_config, geoid='sphere')

Create a rectilinear grid with square cells based on config. The grid moves based on the translation speed and angle of the tropical cyclone.

Parameters:
  • ds (nc.Dataset) – reference to input netcdf4 dataset.

  • grid_config (dict) – A dictionary containing the information necesary to reconstruct the coordinates.

  • tc_config (dict) – A dictionary containing the information necesary to reconstruct the tropical cyclone trajectory.

Returns:

reference to transformed netcdf4 dataset.

Return type:

ds (nc.Dataset)

adforce.fort22.plot_trajectories(grid_config, tc_config, geoid='sphere')

Plot the trajectories of the tropical cyclone.

Parameters:
  • ds (nc.Dataset) – reference to input netcdf4 dataset.

  • tc_config (dict) – A dictionary containing the information necesary to reconstruct the tropical cyclone trajectory.

  • geoid (Literal["pyproj", "sphere"], optional) – Geoid type. Defaults to “sphere”.

Return type:

None

adforce.fort22.rectilinear_square(ds, grid_config)

Create a rectilinear grid with square cells based on grid_config.

Parameters:

grid_config (dict) – A dictionary containing the information necesary to reconstruct the coordinates.

Return type:

Dataset

adforce.fort22datatree module

adforce.fort22datatree.read_fort22(fort22_path=None)

Read fort.22.nc file.

Parameters:

fort22_path (Optional[str], optional) – Path to fort.22.nc file. Defaults to None. If not provided, the default path will be used.

Returns:

Dataset containing fort.22.nc data.

Return type:

xr.Dataset

adforce.fort61 module

Fort.61

adforce.fort61.plot_model_tgauges(folder='../../kat.nws13/')

Plot the zeta timeseries from the fort.61 file.

Parameters:

folder (str, optional) – Path to the fort.61 file. Defaults to “../../kat.nws13/”.

Return type:

None

adforce.fort63 module

Read output files fort.63.nc, maxele.63.nc etcs.

Should probably add more plotting and trimming files here.

Fort.63 files output to unstructured grid netCDF4 files.

However this is a general feature, so perhaps a grid script would be useful.

adforce.fort63.kat2()
Return type:

None

adforce.fort63.plot_nearby(data_folder='../../NWS13set4/', bbox=[('Latitude bounds', [28.5, 30.8], 'degrees_north'), ('Longitude bounds', [-92, -86.5], 'degrees_east'), 'New Orleans Area Bounding Box'], point=[('Latitude', 29.9511, 'degrees_north'), ('Longitude', -90.0715, 'degrees_east'), 'New Orleans'], number=10, pad=2, plot_mesh=True, overtopping=False)

Plot zeta timeseries points from the mesh near a point. Designed for comparison with tidal gauge data.

Parameters:
  • data_folder (str, optional) – _description_. Defaults to “../../NWS13set4/”.

  • bbox (BoundingBox, optional) – _description_. Defaults to NO_BBOX.

  • point (Point, optional) – _description_. Defaults to NEW_ORLEANS.

  • number (int, optional) – _description_. Defaults to 10.

  • pad (float, optional) – _description_. Defaults to 2.

  • plot_mesh (bool, optional) – _description_. Defaults to True.

  • overtopping (bool, optional) – _description_. Defaults to False.

Return type:

None

adforce.fort63.plot_quiver_height(path_in='mult1', time_i=160, x_pos=0.95, y_pos=-0.15)

Plot quiver height.

Parameters:
  • path_in (str, optional) – name of data folder. Defaults to “mult1”.

  • time_i (int, optional) – time_i. Defaults to 185.

  • x_pos (float, optional) – x_pos. Defaults to 0.95.

  • y_pos (float, optional) – y_pos. Defaults to -0.15.

Return type:

None

adforce.generate_training_data module

adforce.geo module

adforce.geo

Module for geographic calculations, either using a sphere or a GEOID.

adforce.geo.distances_bearings_to_center_pyproj(lon_mat, lat_mat, lon_c, lat_c)

Computes the geodesic distances (meters) and bearing (degrees) from each location in a 2D or 3D array of longitudes/latitudes to the corresponding central point(s).

The central longitude(s) and latitude(s) can be scalars (0D) or 1D arrays, while the location arrays can be 2D or 3D. Broadcasting rules apply:

  • If lon_c and lat_c are scalars (0D), they apply to all points in lon_mat/lat_mat.

  • If lon_c and lat_c are 1D, their shape must match the first dimension(s) of lon_mat/lat_mat.

This is the main bottleneck of the code. It use the pyproj Geod class to calculate the geodesic distances and bearing.

Bearing is measured clockwise from north (0° = north, 90° = east, etc.), and is the direction from each (lon_mat, lat_mat) location toward (lon_c, lat_c).

Parameters:
  • lon_mat (array-like) – 2D or 3D array of longitudes (in degrees).

  • lat_mat (array-like) – 2D or 3D array of latitudes (in degrees).

  • lon_c (float or array-like) – 0D or 1D array of central longitude(s) (in degrees).

  • lat_c (float or array-like) – 0D or 1D array of central latitude(s) (in degrees).

Returns:

2D or 3D array of geodesic distancess (in meters),

broadcasted to match the shape of lon_mat/lat_mat.

bearing_arr (np.ndarray): 2D or 3D array of bearings (in degrees, [0, 360)),

broadcasted to match the shape of lon_mat/lat_mat.

Return type:

dist_arr (np.ndarray)

Examples

>>> # Example 1: Single (scalar) center point, 2D location arrays
>>> import numpy as np
>>> lon_mat = np.array([[0.0,  1.0], [2.0,  3.0]])
>>> lat_mat = np.array([[50.0, 51.0], [52.0, 53.0]])
>>> center_lon, center_lat = 1.5, 51.5
>>> dist, bearing = distances_bearings_to_center_pyproj(lon_mat, lat_mat, center_lon, center_lat)
>>> dist.shape, bearing.shape
((2, 2), (2, 2))
>>> # Example 2: 1D center arrays, 2D location arrays: each row uses a different center
>>> lon_mat2 = np.array([[0.0,  1.0], [10.0,  11.0]])
>>> lat_mat2 = np.array([[ 0.0,  1.0], [ 5.0,   6.0]])
>>> center_lons = np.array([0.0, 10.0])  # shape (2,)
>>> center_lats = np.array([0.0,  5.0])  # shape (2,)
>>> dist2, bearing2 = distances_bearings_to_center_pyproj(lon_mat2, lat_mat2, center_lons, center_lats)
>>> dist2.shape, bearing2.shape
((2, 2), (2, 2))
>>> # The first row is measured to center (0,0), second row to center (10,5).
adforce.geo.distances_bearings_to_center_sphere(lon_mat, lat_mat, lon_c, lat_c)

Distance and bearing from each grid point to its center(s) on a sphere.

Parameters:
Return type:

tuple[ndarray[Any, dtype[float32]], ndarray[Any, dtype[float32]]]

Returns:

(distances_m, bearing_deg) matching lon_mat shape (float32).

Examples

>>> import numpy as np
>>> lon = np.array([[0., 1.], [2., 3.]], dtype=np.float32)
>>> lat = np.array([[0., 1.], [2., 3.]], dtype=np.float32)
>>> d, b = distances_bearings_to_center_sphere(lon, lat, 0.0, 0.0)
>>> d.dtype, b.dtype
(dtype('float32'), dtype('float32'))
>>> round(float(b[0, 1]), 1)
225.0
adforce.geo.forward_point_sphere(lon0, lat0, bearing, distances)

Destination point along a great-circle path on a sphere (float32).

Parameters:
Return type:

tuple[ndarray[Any, dtype[float32]], ndarray[Any, dtype[float32]]]

Returns:

(lon_dest, lat_dest) in degrees, float32.

Examples

>>> lon2, lat2 = forward_point_sphere(0.0, 0.0, 90.0, 1000.0)
>>> np.isclose(float(lat2), 0.0)
True
>>> round(float(lon2), 4)
0.009
adforce.geo.haversine_dist_bearing(lon1, lat1, lon2, lat2)

Great-circle distances and initial bearing on a spherical Earth (float32).

Parameters:
Returns:

  • distances_m - float32 great-circle distances in metres.

  • bearing_deg - float32 initial bearing, clockwise from north in degrees [0, 360).

Return type:

Tuple (distances_m, bearing_deg)

Examples

>>> d, b = haversine_dist_bearing(0.0, 0.0, 0.0, 1.0)
>>> round(float(d), -3)        # ≈ 111. km
111000.0
>>> round(float(b), 1)
0.0
adforce.geo.line_with_impact_pyproj(impact_time, impact_lon, impact_lat, translation_speed, bearing, times)

Constructs a line of constant bearing that passes through (impact_lon, impact_lat) at impact_time. For each t in times, the object travels along this bearing at translation_speed (meters/second).

The bearing is assumed to be in degrees (clockwise from north), and longitudes and latitudes are in degrees.

Parameters:
  • impact_time (float) – The time (e.g. in seconds) at which the path intersects (impact_lon, impact_lat).

  • impact_lon (float) – Impact longitude (in degrees).

  • impact_lat (float) – Impact latitude (in degrees).

  • translation_speed (float) – Constant speed (in m/s) along the bearing.

  • bearing (float) – Constant bearing (in degrees, clockwise from north).

  • times (array-like) – Array of time values (same units as impact_time).

Returns:

A tuple of arrays (lon_arr, lat_arr) in degrees for each time in times. The shape matches the shape of the input times.

Return type:

(np.ndarray, np.ndarray)

Examples

>>> # Suppose the object passes through (2°E, 50°N) at t=10s,
>>> # traveling due north at 100 m/s.
>>> import numpy as np
>>> times = np.array([9.0, 10.0, 11.0])
>>> lon_arr, lat_arr = line_with_impact_pyproj(
...     impact_time=10.0,
...     impact_lon=2.0,
...     impact_lat=50.0,
...     translation_speed=100.0,  # m/s
...     bearing=0.0,             # due north
...     times=times
... )
>>> # At t=10.0, we should be exactly at (2, 50).
>>> round(lon_arr[1], 5), round(lat_arr[1], 5)
(2.0, 50.0)
>>> # 1 second earlier, we are about 100m south => ~0.0009 degrees of latitude
>>> round(lat_arr[0], 5)
49.9991
>>> # 1 second later, we are about 100m north => ~50.00090 degrees of latitude
>>> round(lat_arr[2], 5)
50.0009
adforce.geo.line_with_impact_sphere(impact_time, impact_lon, impact_lat, translation_speed, bearing, times)

Great-circle trajectory that passes a known impact point at a given time.

Parameters:
  • impact_time (float) – Time when the object is at (impact_lon, impact_lat). In seconds from beginning of calendar.

  • impact_lon (float) – Impact longitude (°E).

  • impact_lat (float) – Impact latitude (°N).

  • translation_speed (float) – Speed along the bearing (meters/second).

  • bearing (float) – Constant bearing (° clockwise from north).

  • times (Union[_SupportsArray[dtype[Any]], _NestedSequence[_SupportsArray[dtype[Any]]], bool, int, float, complex, str, bytes, _NestedSequence[Union[bool, int, float, complex, str, bytes]]]) – Array of times at which to evaluate the position. In seconds from beginning of calendar.

Return type:

tuple[ndarray[Any, dtype[float32]], ndarray[Any, dtype[float32]]]

Returns:

Tuple[np.ndarray, np.ndarray] (lon_arr, lat_arr) arrays (float32) matching times.

Examples

>>> times = np.array([9., 10., 11.], dtype=np.float32)
>>> lon, lat = line_with_impact_sphere(
...     impact_time=10.0,
...     impact_lon=2.0,
...     impact_lat=50.0,
...     translation_speed=100.0,
...     bearing=0.0,
...     times=times,
... )
>>> round(float(lat[1]), 5)
50.0
>>> float(lat[2]) > float(lat[1])
True
adforce.geo.parabolic_track_with_impact_pyproj(impact_time, impact_lon, impact_lat, translation_speed, bearing, curvature, times)

Accurate Ide 2022/2024-style parabolic track with constant ground speed on WGS-84.

Parameters:
  • impact_time (float) – Epoch seconds when the eye is over the point (impact_lon, impact_lat).

  • impact_lon (float) – Impact longitude (°E).

  • impact_lat (float) – Impact latitude (°N).

  • translation_speed (float) – True ground speed along the curved centre-line (m s⁻¹).

  • bearing (float) – Initial bearing (° clockwise from north; 0 ° = due N).

  • curvature (float) – Parabolic curvature r. Positive bends right, negative bends left. |r| ≃ 5 x 10⁻⁶ m⁻¹ reproduces Ide’s r = ±0.5 when s ≈ 100 km.

  • times (ArrayLike) – 1-D epoch seconds at which to sample the trajectory.

Returns:

(lon_arr, lat_arr) - float32 arrays

matching times.

Return type:

tuple[np.ndarray, np.ndarray]

Examples

Straight great-circle (curvature = 0) heading north:

>>> import numpy as np, pyproj, math
>>> t = np.array([9., 10., 11.], dtype=np.float64)*1000
>>> lon, lat = parabolic_track_with_impact_pyproj(
...     impact_time=10.0*1000, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=0.0,
...     times=t)
>>> float(lat[1])                                   # at impact
50.0
>>> lat[2] > lat[1] > lat[0]                        # moves north
True
>>> # -- distance between successive eye positions ≈ 100 m * dt
>>> az12, az21, d12 = GEOD.inv(
...     lon[1], lat[1], lon[2], lat[2])
>>> abs(d12 - 100.0 * 1000) < 2.0
True

Right-hand bend (positive curvature) moves both flanks eastward:

>>> lon2, _ = parabolic_track_with_impact_pyproj(
...     impact_time=10.0*1000, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=5e-6,
...     times=t)
>>> lon2[0] > lon2[1] and lon2[2] > lon2[1]       # middle = impact
True

Left-hand bend (negative curvature) decreases longitude:

>>> lon3, _ = parabolic_track_with_impact_pyproj(
...     impact_time=10.0*1000, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=-5e-6,
...     times=t)
>>> lon3[0] < 2.0 and lon3[2] < 2.0                # middle = 2.0
True
Should match behaviour of line_with_impact_pyproj if curvature is zero.
>>> lonc, latc = parabolic_track_with_impact_pyproj(
...     impact_time=10.0*1000, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=45, curvature=0.0,
...     times=t)
>>> lonl, latl =  line_with_impact_pyproj(
...     impact_time=10.0*1000, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=45, times=t)
>>> np.allclose(lonl, lonc)
True
>>> np.allclose(latl, latc)
True
adforce.geo.parabolic_track_with_impact_sphere(impact_time, impact_lon, impact_lat, translation_speed, bearing, curvature, times)

Return the longitude/latitude of a storm centre that follows Ide-style parabolic track and crosses a chosen impact point at a specified time.

Parameters:
  • impact_time (float) – Epoch seconds when the eye is exactly over (impact_lon, impact_lat).

  • impact_lon (float) – Impact longitude in degrees East.

  • impact_lat (float) – Impact latitude in degrees North.

  • translation_speed (float) – Forward speed along the initial bearing (metres s⁻¹).

  • bearing (float) – Initial bearing, degrees clockwise from north (0 ° = due north).

  • curvature (float) – Parabolic curvature r. * Positive → track bends right of the bearing. * Negative → bends left. * |r| ≈ 0.5 ≙ turning-radius ≈ 110 km.

  • times (ArrayLike) – 1-D epoch seconds at which to evaluate the position.

Returns:

(lon_arr, lat_arr) — float32 arrays matching times.

Return type:

tuple[np.ndarray, np.ndarray]

Notes

  • Setting curvature = 0 collapses to great-circle motion identical to the original line_with_impact_sphere helper.

  • Uses small-angle ENU projection (good for ≤ 300 km cross-track). Swap Section 4 with a geodesic solver for long crossings.

Examples

Straight track (no curvature) heading north:

>>> import numpy as np
>>> t = np.array([9., 10., 11.], dtype=np.float32)
>>> lon, lat = parabolic_track_with_impact_sphere(
...     impact_time=10.0, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=0.0,
...     times=t)
>>> float(lat[1])
50.0
>>> lat[2] > lat[1] > lat[0]
True

Right-hand bend (positive curvature) increases longitude:

>>> lon2, _ = parabolic_track_with_impact_sphere(
...     impact_time=10.0, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=0.5,
...     times=t)
>>> round(float(lon2[1]), 6)
2.0
>>> lon2[2] > lon2[1]
True

Left-hand bend (negative curvature) decreases longitude:

>>> lon3, _ = parabolic_track_with_impact_sphere(
...     impact_time=10.0, impact_lon=2.0, impact_lat=50.0,
...     translation_speed=100.0, bearing=0.0, curvature=-0.5,
...     times=t)
>>> lon3[0] < 2.0 and lon3[2] < 2.0   # middle point is exactly 2.0
True

adforce.mesh module

Process ADCIRC meshes efficiently vectorized/sparses. This is the shared functionality for processing ADCIRC meshes.

fort.63 format: dimensions:

time: (Unlimited) (604 currently) node: (31435) nele: (58368) nvertex: (3) nope: (1) neta: (103) max_nvdll: (103) nbou: (59) nvel: (4514) max_nvell: (3050) mesh: (1)

coordinates:

time (time): (604) x (node): (31435) longitude, degrees_east y (node): (31435) latitude, degrees_north element (nele, nvertex): (58368, 3) connectivity node (node): (31435)

data_vars:

depth (node): (31435) depth, meters zeta (time, node): (604, 31435) water surface elevation, meters

fort.74 format: ditto but with u-vel and v-vel instead of zeta

fort.73 format: ditto but “pressure” instead of “zeta”

fort.74 format: ditto but “u-vel” and “v-vel” instead of “zeta”

adforce.mesh.bbox_mesh(file_path='/home/docs/checkouts/readthedocs.org/user_builds/worstsurge/checkouts/latest/data/fort.63.nc', bbox=[('Latitude bounds', [28.5, 30.8], 'degrees_north'), ('Longitude bounds', [-92, -86.5], 'degrees_east'), 'New Orleans Area Bounding Box'], use_dask=True)

Load an adcirc output file and filter it to a bounding box.

Parameters:
  • file_path (str, optional) – Path to ADCIRC file. Defaults to “../data/fort.63.nc”.

  • bbox (BoundingBox, optional) – Bounding box to trim to. Defaults to NO_BBOX.

  • use_dask (bool, optional) – Whether to use dask. Defaults to True.

Returns:

Filtered xarray dataset.

Return type:

xr.Dataset

adforce.mesh.calc_ghost_cells_for_swegnn(path)

Calculate ghost cells for elevation boundary conditions from ADCIRC fort.63.nc file.

Parameters:

path (str) – Path to the directory containing fort.63.nc file.

Returns:

Dataset containing ghost cell information for elevation boundary conditions.

Return type:

xr.Dataset

adforce.mesh.calculate_adjacency_matrix(triangles, N, sparse=True)

Calculate a boolean adjacency matrix for a mesh of triangles. Assumes that nodes are numbered from 0 to N-1. Assumes nodes are not self-adjacent.

Parameters:
  • triangles (np.ndarray) – Mx3 array of triangle indices.

  • N (int) – Number of nodes in the mesh.

  • sparse (bool, optional) – Whether to return a sparse matrix. Defaults to True.

Returns:

NxN symetric boolean adjacency matrix.

Return type:

Union[np.ndarray, csr_matrix]

Examples::
>>> np.all(calculate_adjacency_matrix(np.array([[0, 1, 2]]), 3, sparse=False) == np.array([[False, True, True], [True, False, True], [True, True, False]]))
True
>>> np.all(calculate_adjacency_matrix(np.array([[0, 1, 2], [1, 2, 3]]), 4, sparse=False) == np.array([[False, True, True, False], [True, False, True, True], [True, True, False, True], [False, True, True, False]]))
True
adforce.mesh.calculate_boundary_edge_lengths(node_xy, edge_index_BC)

Calculates the physical length of each boundary edge.

Assumes node_xy coordinates allow for Euclidean distance calculation (e.g., meters in a projected coordinate system). For lat/lon degrees, a geodesic distance calculation would be more accurate.

Parameters:
  • node_xy (ndarray) – Array of node coordinates, shape (num_nodes, 2).

  • edge_index_BC (ndarray) – Array of node indices forming boundary edges, shape (num_BC_edges, 2).

Returns:

Array containing the length of each boundary edge,

shape (num_BC_edges,).

Return type:

np.ndarray

Doctests:
>>> node_coords = np.array([[0., 0.], [3., 0.], [3., 4.], [0., 4.]])
>>> bc_edges = np.array([[0, 1], [1, 2], [3, 0]]) # Edges (0,1), (1,2), (3,0)
>>> calculate_boundary_edge_lengths(node_coords, bc_edges)
array([3., 4., 4.])
>>> bc_edges_empty = np.empty((0, 2), dtype=int)
>>> calculate_boundary_edge_lengths(node_coords, bc_edges_empty)
array([], dtype=float64)
adforce.mesh.calculate_dual_graph_adjacency_matrix(triangles, sparse=True)

Calculate the dual graph adjacency matrix for a mesh of triangles.

A dual graph is a graph where each face of the original graph is a node, and the new nodes are adjacent if they share an edge.

Parameters:
  • triangles (np.ndarray) – Mx3 array of triangle indices.

  • sparse (bool, optional) – Whether to return a sparse matrix. Defaults to True.

Returns:

MxM symetric boolean adjacency matrix.

Return type:

Union[np.ndarray, csr_matrix]

Examples::
>>> np.all(calculate_dual_graph_adjacency_matrix(np.array([[0, 1, 2], [1, 2, 3]]), sparse=False) == np.array([[False, True], [True, False]]))
True
>>> np.all(calculate_dual_graph_adjacency_matrix(np.array([[0, 1, 2], [2, 3, 4]]), sparse=False) == np.array([[False, False], [False, False]]))
True
>>> np.all(calculate_dual_graph_adjacency_matrix(
... np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5],
...           [3, 5, 6], [3, 6, 7], [7, 3, 1]]) - 1,
...          sparse=False
... ) == np.array([
... [False, True, False, False, False, True],
... [True, False, True, False, False, False],
... [False, True, False, True, False, False],
... [False, False, True, False, True, False],
... [False, False, False, True, False, True],
... [True, False, False, False, True, False]]))
True
adforce.mesh.calculate_ghost_face_coords(face_xy, face_BC, edge_midpoints, outward_normals)

Calculates coordinates for ghost faces by mirroring across boundary edges.

Parameters:
  • face_xy (ndarray) – Array of face center coordinates, shape (num_faces, 2).

  • face_BC (ndarray) – Indices of boundary faces, shape (num_BC_edges,).

  • edge_midpoints (ndarray) – Midpoints of boundary edges corresponding to face_BC, shape (num_BC_edges, 2).

  • outward_normals (ndarray) – Outward unit normal vectors for boundary edges, shape (num_BC_edges, 2).

Return type:

ndarray

Returns:

Coordinates of the ghost faces, shape (num_BC_edges, 2).

Doctests:
>>> face_coords = np.array([[1., 0.5], [3., 1.5]]) # Two face centers
>>> bc_faces = np.array([0, 1]) # Both are boundary faces
>>> midpts = np.array([[1., 0.], [3., 2.]]) # Edge midpoints
>>> normals = np.array([[0., -1.], [0., 1.]]) # Outward normals
>>> calculate_ghost_face_coords(face_coords, bc_faces, midpts, normals)
array([[ 1. , -0.5],
       [ 3. ,  2.5]])
>>> empty_faces = np.empty((0, 2))
>>> empty_idx = np.empty(0, dtype=int)
>>> empty_midpts = np.empty((0, 2))
>>> empty_normals = np.empty((0, 2))
>>> calculate_ghost_face_coords(empty_faces, empty_idx, empty_midpts, empty_normals)
array([], shape=(0, 2), dtype=float64)
adforce.mesh.calculate_ghost_node_coords(node_xy, triangles, face_BC, edge_index_BC, edge_midpoints, outward_normals)

Calculates coordinates for ghost nodes by mirroring non-boundary nodes across boundary edges. Assumes a triangular mesh.

Parameters:
  • node_xy (ndarray) – Array of node coordinates, shape (num_nodes, 2).

  • triangles (ndarray) – Array of triangle definitions, shape (num_faces, 3).

  • face_BC (ndarray) – Indices of boundary faces, shape (num_BC_edges,).

  • edge_index_BC (ndarray) – Boundary edges corresponding to face_BC, shape (num_BC_edges, 2).

  • edge_midpoints (ndarray) – Midpoints of boundary edges, shape (num_BC_edges, 2).

  • outward_normals (ndarray) – Outward unit normals for boundary edges, shape (num_BC_edges, 2).

Returns:

  • ghost_node_xy: Coordinates of ghost nodes, shape (num_BC_edges, 2)

    (assuming one ghost node per boundary face/triangle).

  • original_node_indices: Indices of the original nodes that were mirrored,

    shape (num_BC_edges,).

  • face_to_ghost_node_map: Dictionary mapping face_BC index to its

    new ghost node index(es) (after appending).

Return type:

A tuple containing

Raises:

ValueError – If helper function fails or assumptions aren’t met.

Doctests:
>>> # Tri 0: [0,1,2] nodes at (0,0), (2,0), (1,1)
>>> triangles_data = np.array([[0, 1, 2]])
>>> node_coords = np.array([[0., 0.], [2., 0.], [1., 1.]])
>>> bc_faces = np.array([0])
>>> bc_edges = np.array([[0, 1]]) # Edge (0,1) is boundary
>>> midpts = get_boundary_edge_midpoints(node_coords, bc_edges) # Should be [1., 0.]
>>> # Dummy face center needed for normal calc - use actual if available
>>> face_coords = np.array([[1., 1./3]])
>>> normals = calculate_outward_normals(node_coords, face_coords, bc_edges, bc_faces, midpts) # Should be [0., -1.]
>>> ghost_coords, orig_indices, _ = calculate_ghost_node_coords(node_coords, triangles_data, bc_faces, bc_edges, midpts, normals)
>>> # Node 2 is at (1,1). Midpoint is (1,0). Normal is (0,-1). Dist=1. Ghost=(1,0)+[(0,-1)*1] = (1,-1)
>>> np.allclose(ghost_coords, np.array([[1., -1.]]))
True
>>> orig_indices
array([2])
>>> # Test empty input
>>> empty_tris = np.empty((0,3), dtype=int)
>>> empty_nodes = np.empty((0,2))
>>> empty_faces = np.empty(0, dtype=int)
>>> empty_edges = np.empty((0,2), dtype=int)
>>> empty_midpts = np.empty((0,2))
>>> empty_normals = np.empty((0,2))
>>> ghost_coords, orig_indices, _ = calculate_ghost_node_coords(
...     empty_nodes, empty_tris, empty_faces, empty_edges, empty_midpts, empty_normals
... )
>>> ghost_coords.shape
(0, 2)
>>> orig_indices.shape
(0,)
adforce.mesh.calculate_outward_normals(node_xy, face_xy, edge_index_BC, face_BC, edge_midpoints)

Calculates outward-pointing unit normal vectors for boundary edges.

Parameters:
  • node_xy (ndarray) – Array of node coordinates, shape (num_nodes, 2).

  • face_xy (ndarray) – Array of face center coordinates, shape (num_faces, 2).

  • edge_index_BC (ndarray) – Indices of nodes forming BC edges, shape (num_BC_edges, 2).

  • face_BC (ndarray) – Indices of boundary faces corresponding to edge_index_BC, shape (num_BC_edges,).

  • edge_midpoints (ndarray) – Midpoints of boundary edges, shape (num_BC_edges, 2).

Return type:

ndarray

Returns:

Outward unit normal vectors, shape (num_BC_edges, 2).

Doctests:
>>> node_coords = np.array([[0., 0.], [2., 0.], [1., 1.]]) # Triangle 0
>>> face_coords = np.array([[1., 1./3]]) # Center of triangle 0
>>> bc_edges = np.array([[0, 1]]) # Edge (0,1) is boundary
>>> bc_faces = np.array([0]) # Face 0 is the boundary face
>>> midpts = get_boundary_edge_midpoints(node_coords, bc_edges)
>>> calculate_outward_normals(node_coords, face_coords, bc_edges, bc_faces, midpts)
array([[ 0., -1.]])
>>> node_coords = np.array([[0., 0.], [0., 2.], [1., 1.]]) # Triangle 0
>>> face_coords = np.array([[1./3, 1.]]) # Center of triangle 0
>>> bc_edges = np.array([[0, 1]]) # Edge (0,1) is boundary
>>> bc_faces = np.array([0]) # Face 0 is the boundary face
>>> midpts = get_boundary_edge_midpoints(node_coords, bc_edges)
>>> normals = calculate_outward_normals(node_coords, face_coords, bc_edges, bc_faces, midpts)
>>> np.allclose(normals, np.array([[-1., 0.]]))
True
adforce.mesh.calculate_triangle_areas(x_coords, y_coords, triangles)

Calculate the area of each triangle using the Shoelace formula.

Parameters:
  • x_coords (np.ndarray) – (N,) array of x-coordinates for all nodes.

  • y_coords (np.ndarray) – (N,) array of y-coordinates for all nodes.

  • triangles (np.ndarray) – (M, 3) array of triangle indices (0-based).

Returns:

(M,) array containing the area of each triangle.

Return type:

np.ndarray

Examples::
>>> x = np.array([0, 1, 0])
>>> y = np.array([0, 0, 1])
>>> tri = np.array([[0, 1, 2]])
>>> calculate_triangle_areas(x, y, tri)
array([0.5])
>>> x = np.array([0, 1, 1, 0])
>>> y = np.array([0, 0, 1, 1])
>>> tri = np.array([[0, 1, 2], [0, 2, 3]])
>>> calculate_triangle_areas(x, y, tri)
array([0.5, 0.5])
adforce.mesh.dual_graph_ds_base_from_triangles(triangles, x, y)

Calculate the dual graph adjacency matrix for a mesh of triangles.

Nodes are the faces of the original graph, and adjacent if they share an edge. There are E shared edges between the faces.

Parameters:
  • triangles (np.ndarray) – Mx3 array of triangle indices. Assumes nodes are numbered from 0 to N-1. where N is the number of nodes in the original mesh.

  • x (np.ndarray) – x-coordinates of the nodes. Degrees east.

  • y (np.ndarray) – y-coordinates of the nodes. Degrees north.

Returns:

Dual graph dataset.
  • ”element”: Mx3 array of triangle indices.

  • ”start”: 2E array of start indices. (Double counting)

  • ”end”: 2E array of end indices. (Double counting)

  • ”x”: M array of x-coordinates of the dual graph nodes.

  • ”y”: M array of y-coordinates of the dual graph nodes.

  • ”length”: 2E array of lengths of the edges.

  • ”unit_normal_x”: 2E array of x-coordinates of the unit normal vectors.

  • ”unit_normal_y”: 2E array of y-coordinates of the unit normal vectors.

Return type:

xr.Dataset

Examples::
>>> ds = dual_graph_ds_base_from_triangles(np.array([[0, 1, 2], [1, 2, 3]]), np.array([0, 1, 2, 3]), np.array([0, -1, -1, -2]))
>>> np.all(ds.start.values == np.array([0, 1]))
True
>>> np.all(ds.end.values == np.array([1, 0]))
True
>>> np.all(ds.element.values - 1 == np.array([[0, 1, 2], [1, 2, 3]]))
True
adforce.mesh.dual_graph_ds_from_mesh_ds(ds, take_grad='static')

Create a dual graph dataset from an ADCIRC output dataset.

Parameters:
  • ds (xr.Dataset) – ADCIRC output xarray dataset with “x”, “y”, “element” and “depth”.

  • (take_grad (take_grad) – Literal[“none”, “dynamic”, “static”, “all”], optional): Whether to calculate the gradient of the depth, zeta, u-vel, v-vel, windx, windy, pressure. Defaults to “static”.

Returns:

Dual graph dataset.

Return type:

xr.Dataset

Examples::
>>> ds = dual_graph_ds_from_mesh_ds(_test_xr_dataset(), take_grad="all")
>>> np.isclose(ds.depth.values, np.array([2, 3, 4-1/3]), atol=1e-6).all()
True
>>> np.isclose(ds.y.values, np.array([-1 +1/3, -1 - 1/3, -1 - 1/3]), atol=1e-6).all()
True
>>> np.isclose(ds.x.values, np.array([1, 2, 3 - 1/3]), atol=1e-6).all()
True
>>> np.all(ds.element.values - 1 == np.array([[0, 1, 2], [1, 2, 3], [2, 3, 4]]))
True
>>> np.isclose(ds.zeta.values, np.array([[2, 3, 4-1/3], [3, 4, 5-1/3]]), atol=1e-6).all()
True
>>> np.isclose(ds.depth_grad.values, np.array([[1, 1, 1], [0, 0, 0]]), atol=1e-6).all()
True
>>> np.isclose(ds.zeta_grad.values, np.array([[[1, 1, 1], [1, 1, 1]], [[0, 0, 0], [0, 0, 0]]]), atol=1e-6).all()
True
adforce.mesh.dual_graph_ds_from_mesh_ds_from_path(path='/home/docs/checkouts/readthedocs.org/user_builds/worstsurge/checkouts/latest/data/exp_0049', bbox=None, use_dask=True, take_grad='static')

Process the dual graph from a path to the fort.*.nc files.

Parameters:
  • path (str, optional) – Defaults to DATA_PATH

  • bbox (BoundingBox, optional) – Bounding box to filter the data. Defaults to NO_BBOX.

  • use_dask (bool, optional) – Whether to use dask. Defaults to True.

  • take_grad (bool, Literal["none", "dynamic", "static", "all"]) – Whether to calculate the gradient of the variables. Defaults to “static”.

Raises:

FileNotFoundError – If the fort.*.nc files do not exist. for * in [63, 64, 73, 74].

Returns:

Dual graph dataset.

Return type:

xr.Dataset

adforce.mesh.dual_graph_starts_ends_from_triangles(triangles, x=None, y=None, use_pyproj=False)

Generate start, end indices for the dual graph from the triangles.

TODO: This looks slow.

Parameters:
  • triangles (np.ndarray) – Mx3 array of triangle indices.

  • x (np.ndarray, optional) – x-coordinates. Defaults to None.

  • y (np.ndarray, optional) – y-coordinates. Defaults to None.

Returns:

start, end indices for the dual graph. or Tuple[List[int], List[int], List[float], List[float], List[float]]: start, end, length, xdual, ydual, unit normal x, unit normal y.

Return type:

Tuple[List[int], List[int]]

Examples::
>>> start, end = dual_graph_starts_ends_from_triangles(np.array([[0, 1, 2], [1, 2, 3]]))
>>> np.all(start == np.array([0, 1]))
True
>>> np.all(end == np.array([1, 0]))
True
>>> start, end, length, xd, yd, unx, uny = dual_graph_starts_ends_from_triangles(np.array([[0, 1, 2], [1, 2, 3]]), x=np.array([0, 1, 2, 3]), y=np.array([0, -1, -1, -2]))
>>> np.all(length == np.array([1.0, 1.0]))
True
>>> np.all(unx == np.array([0.0, 0.0]))
True
>>> np.all(uny  == np.array([-1.0, 1.0]))
True
adforce.mesh.elevation_boundary_edges(nbdv, nvdll, ibtypee, boundary_type=0)

Extract elevation boundary edges from ADCIRC mesh data.

Parameters:
  • nbdv (np.ndarray) – 1D array of node indices for elevation boundaries (0-based).

  • nvdll (np.ndarray) – 1D array of number of nodes in each elevation boundary segment.

  • ibtypee (np.ndarray) – 1D array of types for each elevation boundary segment.

  • boundary_type (Optional[int]) – Specific elevation boundary type to extract. If None, extracts edges for all types. Defaults to 0.

Returns:

Array of boundary edges, shape (num_BC_edges, 2),

containing pairs of 0-based node indices. Returns an empty array if no relevant boundaries are found.

Return type:

np.ndarray

Doctests:
>>> nbdv = np.array([0, 1, 3, 4])
>>> nvdll = np.array([4])
>>> ibtypee = np.array([0])
>>> elevation_boundary_edges(nbdv, nvdll, ibtypee, boundary_type=0)
array([[0, 1],
        [1, 3],
        [3, 4]])
>>> elevation_boundary_edges(nbdv, nvdll, ibtypee, boundary_type=1)
array([], shape=(0, 2), dtype=int64)
adforce.mesh.filter_dual_graph(dg_ds, indices)
Return type:

Dataset

adforce.mesh.filter_mesh(adc_ds, indices)

Filter an ADCIRC mesh to a subset of nodes. Keep the triangular component mesh.

Parameters:
  • adc_ds (xr.Dataset) – ADCIRC output xarray dataset with “x”, “y”, “element”.

  • indices (np.ndarray) – Indices to keep.

Returns:

Filtered xarray dataset.

Return type:

xr.Dataset

adforce.mesh.find_boundary_faces(triangles, edge_index_BC)

Finds the indices of faces (triangles) that contain boundary edges using NumPy.

Compares the edges of each triangle against a provided list of boundary edges efficiently.

Parameters:
  • triangles (ndarray) – Array of triangle definitions, shape (num_faces, 3), containing 0-based node indices for each face.

  • edge_index_BC (ndarray) – Array of boundary edges, shape (num_BC_edges, 2), containing pairs of 0-based node indices.

Returns:

A sorted array of unique indices of the faces that

contain at least one boundary edge.

Return type:

np.ndarray

Doctests:
>>> # Triangle 0: nodes [0, 1, 2], edges (0,1), (1,2), (2,0)
>>> # Triangle 1: nodes [1, 3, 2], edges (1,3), (3,2), (2,1)
>>> triangles_data = np.array([[0, 1, 2], [1, 3, 2]])
>>> # Boundary edge is (0, 1)
>>> bc_edges_data = np.array([[0, 1]])
>>> find_boundary_faces(triangles_data, bc_edges_data)
array([0])
>>> # Boundary edge is (1, 2) - shared by both triangles
>>> bc_edges_data = np.array([[1, 2]])
>>> find_boundary_faces(triangles_data, bc_edges_data)
array([0, 1])
>>> # Boundary edge is (2, 1) - reverse direction, shared by both
>>> bc_edges_data = np.array([[2, 1]])
>>> find_boundary_faces(triangles_data, bc_edges_data)
array([0, 1])
>>> # Boundary edge (3, 2) - only in triangle 1
>>> bc_edges_data = np.array([[3, 2]])
>>> find_boundary_faces(triangles_data, bc_edges_data)
array([1])
>>> # No matching boundary edges
>>> bc_edges_data = np.array([[0, 3]])
>>> find_boundary_faces(triangles_data, bc_edges_data)
array([], dtype=int64)
>>> # Empty input edges
>>> bc_edges_empty = np.empty((0, 2), dtype=int)
>>> find_boundary_faces(triangles_data, bc_edges_empty)
array([], dtype=int64)
adforce.mesh.find_non_boundary_nodes_for_faces(triangles, face_BC, edge_index_BC)

Identifies the node(s) within each boundary face that are not part of the corresponding boundary edge.

Assumes each face in face_BC corresponds uniquely to an edge in edge_index_BC (i.e., they have the same order and length).

Parameters:
  • triangles (ndarray) – Array of triangle definitions, shape (num_faces, 3), containing 0-based node indices.

  • face_BC (ndarray) – Indices of boundary faces, shape (num_BC_edges,).

  • edge_index_BC (ndarray) – Boundary edges corresponding to face_BC, shape (num_BC_edges, 2).

Return type:

Dict[int, List[int]]

Returns:

A dictionary mapping each boundary face index (from face_BC) to a list of its non-boundary node index(es).

Raises:

ValueError – If a face in face_BC is not found or if the number of boundary faces and edges don’t match.

Doctests:
>>> # Tri 0: [0,1,2], Tri 1: [1,3,2]
>>> triangles_data = np.array([[0, 1, 2], [1, 3, 2]])
>>> # BC Edge (0,1) corresponds to Face 0
>>> bc_faces_data = np.array([0])
>>> bc_edges_data = np.array([[0, 1]])
>>> find_non_boundary_nodes_for_faces(triangles_data, bc_faces_data, bc_edges_data)
{0: [2]}
>>> # BC Edge (3,2) corresponds to Face 1
>>> bc_faces_data = np.array([1])
>>> bc_edges_data = np.array([[3, 2]])
>>> find_non_boundary_nodes_for_faces(triangles_data, bc_faces_data, bc_edges_data)
{1: [1]}
>>> # BC Edges (0,1) -> Face 0; (3,2) -> Face 1
>>> bc_faces_data = np.array([0, 1])
>>> bc_edges_data = np.array([[0, 1], [3, 2]])
>>> find_non_boundary_nodes_for_faces(triangles_data, bc_faces_data, bc_edges_data)
{0: [2], 1: [1]}
>>> # Test empty input
>>> find_non_boundary_nodes_for_faces(triangles_data, np.array([]), np.empty((0,2)))
{}
adforce.mesh.get_boundary_edge_midpoints(node_xy, edge_index_BC)

Calculates the midpoint of each boundary edge.

Parameters:
  • node_xy (ndarray) – Array of node coordinates, shape (num_nodes, 2).

  • edge_index_BC (ndarray) – Array of node indices forming boundary edges, shape (num_BC_edges, 2).

Return type:

ndarray

Returns:

Midpoints of boundary edges, shape (num_BC_edges, 2).

Doctests:
>>> node_coords = np.array([[0., 0.], [2., 0.], [2., 2.], [0., 2.]])
>>> bc_edges = np.array([[0, 1], [1, 2]]) # Edges (0,1) and (1,2)
>>> get_boundary_edge_midpoints(node_coords, bc_edges)
array([[1., 0.],
       [2., 1.]])
>>> bc_edges_empty = np.empty((0, 2), dtype=int)
>>> get_boundary_edge_midpoints(node_coords, bc_edges_empty)
array([], shape=(0, 2), dtype=float64)
adforce.mesh.get_elevation_boundary_edges(ds, boundary_type=0)

Helper function to extract elevation boundary edges from an open netCDF4 Dataset object.

Parameters:
  • ds (nc.Dataset) – Open netCDF4 Dataset object.

  • boundary_type (Optional[int]) – The specific elevation boundary type (from ‘ibtypee’) to extract. If None, extracts edges for all elevation boundary types. Defaults to 0.

Returns:

An array of boundary edges, shape (num_BC_edges, 2),

containing pairs of 0-based node indices. Returns an empty array if no relevant boundaries are found.

Return type:

np.ndarray

adforce.mesh.grad_for_triangle_static(x_lon, y_lat, z_values, triangles)

Calculate the gradient of z-values for each triangle, assuming z_values has shape (N,).

Three points define a plane, so we can calculate the gradient of that plane (dz/dx, dz/dy).

Parameters:
  • x_lon (np.ndarray) – (N,) array of x-coordinates.

  • y_lat (np.ndarray) – (N,) array of y-coordinates.

  • z_values (np.ndarray) – (N,) array of z-values.

  • triangles (np.ndarray) – (M, 3) array of triangle indices.

Returns:

[0, :] -> dz/dx for each triangle [1, :] -> dz/dy for each triangle

Return type:

np.ndarray of shape (2, M)

Examples

>>> import numpy as np
>>> # First test
>>> np.all(
...   np.isclose(
...     grad_for_triangle_static(
...         np.array([0, 0, 1]),
...         np.array([0, 1, 0]),
...         np.array([1, 1, 0]),
...         np.array([[0, 1, 2]])
...     ),
...     np.array([[-1], [0]]),
...     atol=1e-6
...   )
... )
True
>>> # Second test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 1]),
...     np.array([0, 1, 0]),
...     np.array([1, 0, 0]),
...     np.array([[0, 1, 2]])
...   ) == np.array([[-1], [-1]])
... )
True
>>> # Third test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 2]),
...     np.array([0, 2, 0]),
...     np.array([1, 0, 0]),
...     np.array([[0, 1, 2], [1, 2, 0]])
...   ) == np.array([[-0.5, -0.5], [-0.5, -0.5]])
... )
True
>>> # Fourth test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 0.5]),
...     np.array([0, 0.5, 0]),
...     np.array([1, 0, 0]),
...     np.array([[0, 1, 2]])
...   ) == np.array([[-2], [-2]])
... )
True
>>> # Fifth test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 0.5]),
...     np.array([0, 0.5, 0]),
...     np.array([2, 0, 0]),
...     np.array([[0, 1, 2]])
...   ) == np.array([[-4], [-4]])
... )
True
>>> # Sixth test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 1]),
...     np.array([0, 1, 0]),
...     np.array([0, 0, 0]),
...     np.array([[0, 1, 2]])
...   ) == np.array([[0], [0]])
... )
True
>>> # Seventh test
>>> np.all(
...   grad_for_triangle_static(
...     np.array([0, 0, 1]),
...     np.array([0, 1, 0]),
...     np.array([0, 1, 0]),
...     np.array([[0, 1, 2]])
...   ) == np.array([[0], [1]])
... )
True
adforce.mesh.grad_for_triangle_timeseries(x_lon, y_lat, z_values, triangles)

Calculate the gradient of z-values for each triangle, assuming z_values has shape (T, N).

Three points define a plane, so we can calculate the gradient (dz/dx, dz/dy). We do it for each of the T time steps.

Parameters:
  • x_lon (np.ndarray) – (N,) array of x-coordinates.

  • y_lat (np.ndarray) – (N,) array of y-coordinates.

  • z_values (np.ndarray) – (T, N) array of z-values at each of T times.

  • triangles (np.ndarray) – (M, 3) array of triangle indices.

Returns:

[0, t, m] -> dz/dx for triangle m at time t [1, t, m] -> dz/dy for triangle m at time t

Return type:

np.ndarray of shape (2, T, M)

Examples

>>> import numpy as np
>>> # Suppose we have T=2 snapshots and N=3 nodes
>>> zt = np.array([
...     [1, 1, 0],  # time=0
...     [0, 1, 0]   # time=1
... ])
>>> tri = np.array([[0, 1, 2]])
>>> result = grad_for_triangle_timeseries(
...     np.array([0, 0, 1]),
...     np.array([0, 1, 0]),
...     zt,
...     tri
... )
>>> result.shape
(2, 2, 1)
>>> # We can also test that the first time-step's gradient
>>> # matches grad_for_triangle_static if we pass in the same z-values
>>> static_grad = grad_for_triangle_static(
...     np.array([0, 0, 1]),
...     np.array([0, 1, 0]),
...     np.array([1, 1, 0]),
...     tri
... )
>>> np.allclose(result[:, 0, :], static_grad)  # compare time=0 vs static
True
adforce.mesh.mean_for_triangle(values, triangles)

Calculate the mean value for each triangle.

Parameters:
  • values (np.ndarray) – N array of values.

  • triangles (np.ndarray) – Mx3 array of triangle indices.

Returns:

M array of mean values for each triangle.

Return type:

np.ndarray

Examples::
>>> np.all(mean_for_triangle(np.array([1, 2, 3, 4]), np.array([[0, 1, 2], [1, 2, 3]])) == np.array([2, 3]))
True
>>> np.all(mean_for_triangle(np.array([1, 2, 3, 4]), np.array([[0, 1, 2]])) == np.array([2]))
True
adforce.mesh.plot_contour(ax, x_values, y_values, adj_matrix)

Plot a mesh with edges.

Parameters:
  • ax (Axes) – matplotlib axis.

  • x_values (np.ndarray) – x values of nodes.

  • y_values (np.ndarray) – y values of nodes.

  • adj_matrix (Union[np.ndarray, csr_matrix]) – adjacency matrix.

Return type:

None

adforce.mesh.select_coast(mesh_ds, overtopping=False, keep_sparse=False)

Select the coastal nodes.

Parameters:
  • mesh_ds (xr.Dataset) – ADCIRC output xarray dataset with “x”, “y”, “element” (and “depth” if overtopping allowed).

  • overtopping (bool, optional) – Whether overtopping is included in mesh. Defaults to False.

Returns:

coastal indices, adjacency matrix.

Return type:

Tuple[np.ndarray, np.ndarray]

adforce.mesh.select_coast_mesh(mesh_ds, overtopping=False)

Select the coastal mesh.

Parameters:
  • mesh_ds (xr.Dataset) – ADCIRC output xarray dataset with “x”, “y”, “element” (and “depth” if overtopping allowed).

  • overtopping (bool, optional) – Whether overtopping is included in mesh. Defaults to False.

Returns:

Filtered xarray dataset.

Return type:

xr.Dataset

adforce.mesh.select_nearby(mesh_ds, lon, lat, number=10, overtopping=False, verbose=False)

Select edge cells near a point.

Parameters:
  • mesh_ds (xr.Dataset) – ADCIRC output xarray dataset with “x”, “y” and “element”.

  • lon (float) – Longitude of central point (degree_East).

  • lat (float) – Latitude of central point (degree_North).

  • number (int, optional) – How many to choose initially. Defaults to 10.

  • overtopping (bool, optional) – Whether overtopping is included in mesh. Defaults to False.

  • verbose (bool, optional) – Whether to print. Defaults to False.

Returns:

coastal indices near central point.

Return type:

xr.Dataset

adforce.mesh.standard_starts_ends_from_triangles(triangles)

Generate start, end indices from the triangles for the regular graph.

Parameters:

triangles (np.ndarray) – Mx3 array of triangle indices.

Returns:

start, end indices for the adjacency matrix.

Return type:

Tuple[np.ndarray, np.ndarray]

Examples::
>>> start, end = standard_starts_ends_from_triangles(np.array([[0, 1, 2], [1, 2, 3]]))
>>> np.all(start == np.array([2, 1, 0, 2, 1, 0, 3, 2, 1, 3, 2, 1]))
True
>>> np.all(end == np.array([0, 0, 1, 1, 2, 2, 1, 1, 2, 2, 3, 3]))
True
adforce.mesh.starts_ends_to_adjacency_matrix(starts, ends, size, sparse=True)

Convert start, end indices to adjacency matrix.

Parameters:
  • starts (Union[List[int], np.ndarray]) – start indices.

  • ends (Union[List[int], np.ndarray]) – end indices.

  • size (int) – number of nodes.

  • sparse (bool, optional) – Whether to return a sparse matrix. Defaults to True.

Returns:

adjacency matrix.

Return type:

Union[np.ndarray, csr_matrix]

adforce.mesh.swegnn_dg_from_mesh_ds_from_path(path='/home/docs/checkouts/readthedocs.org/user_builds/worstsurge/checkouts/latest/data/exp_0049', time_downsampling=36, use_dask=True)

Get the dual graph into the format required for the SWE-GNN model.

Transforms the output of dual_graph_ds_from_mesh_ds_from_path to match the expected variable names and structure for the mSWE-GNN input pipeline.

Parameters:
  • path (str, optional) – Path to ADCIRC fort.*.nc files. Defaults to DATA_PATH/exp_0049.

  • time_downsampling (int, optional) – Factor to downsample time dimension. Defaults to 36.

  • use_dask (bool, optional) – Whether to use dask for loading. Defaults to True.

Returns:

Dual graph dataset formatted for mSWE-GNN processing.

Return type:

xr.Dataset

adforce.mesh.swegnn_netcdf_creation(path_in, path_out, time_downsampling=36, use_dask=True)

Create a netCDF file formatted for mSWE-GNN from ADCIRC fort.*.nc files.

Parameters:
  • path_in (str) – Path to the directory containing fort.*.nc files.

  • path_out (str) – Path to save the output netCDF file. E.g. “swegnn_data.nc”.

  • time_downsampling (int, optional) – Factor to downsample time dimension. Defaults to 36.

  • use_dask (bool, optional) – Whether to use dask for loading. Defaults to True.

Return type:

None

adforce.mesh.xr_loader(file_path, verbose=False, use_dask=False)

Load an xarray dataset from a ADCIRC netCDF4 file.

ADCIRC netCDF4 files have some scalar variables that need to be removed.

Parameters:
  • file_path (str) – Path to netCDF4 file.

  • verbose (bool, optional) – Whether to print. Defaults to False.

  • use_dask (bool, optional) – Whether to use dask. Defaults to False.

Returns:

loaded xarray dataset (lazy).

Return type:

xr.Dataset

adforce.process_single module

Processes a single, completed ADCIRC run directory into SWE-GNN format.

This script is intended to be called by a Slurm job array, where each task processes one directory.

adforce.process_single.check_and_process_run(run_path, save_path, use_dask)

Checks a single run for success and processes it if valid.

Parameters:
  • run_path (str) – Path to the single run directory (e.g., …/run_5sec/exp_0001).

  • save_path (str) – Full path for the output .nc file.

  • use_dask (bool) – Whether to use dask for loading.

Returns:

True if processing was successful, False otherwise.

Return type:

bool

adforce.profile module

adforce/profile.py: Profile module for adforce.

adforce.profile.pressures_profile(rr, vv, fcor=5e-05, p0=101500, rho0=1.15)

Add pressure profile to wind profile.

Parameters:
  • rr (np.ndarray) – Radii [m].

  • vv (np.ndarray) – Wind speeds [m/s].

  • fcor (float, optional) – Coriolis parameter [s-1]. Defaults to 5e-5.

  • p0 (float, optional) – Background pressure [Pa]. Defaults to 1015 * 100.

  • rho0 (float, optional) – Background density [kg m-3]. Defaults to 1.15.

Returns:

Pressures [hPa].

Return type:

np.ndarray

adforce.profile.read_profile(profile_path)

Read a TC profile from a JSON file.

Parameters:

profile_path (str) – Path to the JSON file.

Returns:

Dataset containing the wind speeds and pressures

of the TC profile. The dataset has the following data variables: - windspeeds: Wind speeds [m/s]. - pressures: Pressures [hPa]. The dataset has the following coordinate: - radii: Radii [m].

Return type:

xr.Dataset

Raises:

ValueError – If the profile does not contain ‘rr’ and ‘VV’ keys.

Examples

>>> import os
>>> from w22.constants import DATA_PATH
>>> profile_ds = read_profile(os.path.join(DATA_PATH, "2025.json"))
>>> assert "windspeeds" in profile_ds.data_vars
>>> assert "pressures" in profile_ds.data_vars
>>> assert "radii" in profile_ds.coords

adforce.slurm module

adforce.slurm.is_slurm_job_failed(jid)

Check if a SLURM job is failed using sacct.

Parameters:

jid (int) – Job ID.

Returns:

Whether the job is failed.

Return type:

bool

adforce.slurm.is_slurm_job_finished(jid)

Check if a SLURM job is finished using sacct.

Parameters:

jid (int) – Job ID.

Returns:

Whether the job is finished.

Return type:

bool

adforce.slurm.setoff_slurm_job_and_wait(direc, config)

Run the ADCIRC model and wait for it to finish.

Uses slurmpy to set off a slurm job and sacct to check if it has finished.

Parameters:
  • direc (str) – Path to ADCIRC run folder.

  • config (DictConfig) – Configuration.

Returns:

Slurm Job ID.

Return type:

int

adforce.subprocess module

Run ADCIRC with subprocess rather than slurmpy.

adforce.subprocess.setoff_subprocess_job_and_wait(direc, config)

Run the ADCIRC model in the given directory and wait for it to finish. This uses subprocess (no SLURM) to execute ADCIRC on the current node with parallelism.

Parameters:
  • direc (str) – Path to ADCIRC run folder.

  • config (DictConfig) – Configuration object with required settings.

Returns:

An integer status code (0 if successful, non-zero if errors occurred).

Return type:

int

adforce.tc_plots module

Module for plotting TC tracks and simulation summary data.

This script generates a summary plot of all U.S. landfalling TC tracks on the ADCIRC mesh, colored by intensity.

Usage:

python -m adforce.plotting [–test-single] [–output-name “track_summary.pdf”]

adforce.tc_plots.plot_all_tc_tracks_on_mesh(all_storms_ds, mesh_path, output_path, test_single=False)

Plots TC tracks from IBTrACS on the ADCIRC mesh.

Tracks are colored by wind intensity using the Saffir-Simpson scale.

Parameters:
  • all_storms_ds (xr.Dataset) – Dataset containing all storms from na_landing_tcs().

  • mesh_path (str) – Path to the fort.14 mesh file.

  • output_path (str) – Path to save the output .pdf plot.

  • test_single (bool, optional) – If True, plots only Katrina 2005. Defaults to False.

Return type:

None

adforce.time module

Some time conversion functions to deal with time for fort.22.nc.

adforce.time.datetime_to_time(dt, units, calendar)

Convert time in minutes to datetime objects.

We need to convert to the proleptic_gregorian calendar.

Parameters:
  • time (datetime) – datetime object.

  • units (str) – units of time.

  • calendar (str) – calendar type.

Returns:

minutes since 1990-01-01T01:00:00

Return type:

int

Examples::
>>> datetime_to_time(datetime.datetime(2004, 8, 9, 0, 0), "minutes since 1990-01-01T01:00:00", "proleptic_gregorian") == 7680900
True
adforce.time.str_to_datetime(time_str)

Convert time string to datetime object.

Parameters:

time_str (str) – time string in the format “%Y-%m-%dT%H:%M:%S”

Returns:

datetime object

Return type:

datetime.datetime

Examples::
>>> str_to_datetime("2004-08-08T23:00:00") == datetime.datetime(2004, 8, 8, 23, 0)
True
adforce.time.str_to_time(time_str, units, calendar)

Convert time string to time in minutes.

Parameters:
  • time_str (str) – time string in the format “%Y-%m-%dT%H:%M:%S”

  • units (str) – units of time.

  • calendar (str) – calendar type.

Returns:

time in minutes from 1990-01-01T01:00:00

Return type:

int

Examples::
>>> str_to_time("2004-08-09T00:00:00",  "minutes since 1990-01-01T01:00:00", "proleptic_gregorian" ) == 7680900
True
adforce.time.time_to_datetime(time, units, calendar)

Convert time in minutes to datetime objects.

We need to convert to the proleptic_gregorian calendar.

Parameters:
  • time (int) – time in minutes.

  • units (str) – units of time.

  • calendar (str) – calendar type.

Returns:

datetime object

Return type:

datetime.datetime

Examples::
>>> time_to_datetime(7680900, "minutes since 1990-01-01T01:00:00", "proleptic_gregorian") == datetime.datetime(2004, 8, 9, 0, 0)
True
adforce.time.unknown_to_time(unknown_t, units=None, calendar=None)

Convert unknown time to time in minutes.

Parameters:
  • unknown_t (Union[str, datetime.datetime, int]) – unknown time.

  • units (Optional[str]) – units of time.

  • calendar (Optional[str]) – calendar type.

Returns:

time in minutes from 1990-01-01T01:00:00

Return type:

int

Examples::
>>> unknown_to_time("2004-08-09T00:00:00", "minutes since 1990-01-01T01:00:00", "proleptic_gregorian") == 7680900
True
>>> unknown_to_time(datetime.datetime(2004, 8, 9, 0, 0), "minutes since 1990-01-01T01:00:00", "proleptic_gregorian") == 7680900
True
>>> unknown_to_time(7680900) == 7680900
True

adforce.wrap module

Wrap the adcirc call.

adforce.wrap.get_default_config()
Return type:

OmegaConf

adforce.wrap.idealized_tc_observe(cfg)

Wrap the adcirc call.

Parameters:

cfg (DictConfig) – configuration.

Returns:

max water level at observation point.

Return type:

float

adforce.wrap.main(cfg)

Command line call

Parameters:

cfg (DictConfig) – Full config file.

Returns:

float.

Return type:

float

adforce.wrap.observe_max_point(cfg)

Observe the ADCIRC model.

Return type:

float

Module contents