#===============================================================================
# pyvale: the python validation engine
# License: MIT
# Copyright (C) 2025 The Computer Aided Validation Team
#===============================================================================
from pathlib import Path
from dataclasses import dataclass
import enum
import numpy as np
import pyvale.mooseherder as mh
[docs]
class ESaveArray(enum.Enum):
"""Enumeration setting the file type to save arrays as either numpy,
delimited plain text or both.
"""
NPY = enum.auto()
TXT = enum.auto()
BOTH = enum.auto()
[docs]
def save_array(save_file: Path,
data: np.ndarray,
save_format: ESaveArray,
txt_header: str = "",
txt_delimiter: str = ",",
txt_ext: str = ".csv"
) -> None:
"""Wrapper function to save a numpy array to disk in binary npy, delimited
plain text or both formats.
Parameters
----------
save_file : Path
Path including file name to save the numpy arrays to.
data : np.ndarray
Array to save to disk.
save_format : ESaveArray
Enumeration specifying to save the array in binary numpy, delimited
plain text or both formats.
txt_header : str, optional
String specifying the headers for text files, by default "".
txt_delimiter : str, optional
Delimiter for text file array output, by default ","
txt_ext : str, optional
Extension for text file output, by default ".csv"
Raises
------
FileExistsError
The parent directory where the array files to be saved does not exist.
"""
if not save_file.parent.exists():
raise FileExistsError(f"Parent directory: {save_file.parent.resolve()},"
+ " to save numpy array does not exist.")
if save_format == ESaveArray.TXT or save_format == ESaveArray.BOTH:
np.savetxt(
save_file.with_suffix(txt_ext),
data,
delimiter=txt_delimiter,
header=txt_header,
comments="", # Removes '#' in header
)
if save_format == ESaveArray.TXT or save_format == ESaveArray.BOTH:
np.save(save_file.with_suffix(".npy"),data)
[docs]
class ESaveFieldOpt(enum.Enum):
"""Enumeration specifying how to save physics fields as:
- 'BY_TIME': One array per time step where the first dimension is the nodal
coordinates and the second dimension is the field component.
- 'BY_FIELD': A single array per nodal field where the first dimension is
the coordinate and the second dimension is the time step.
- 'BOTH': Save in both formats.
"""
BY_TIME = enum.auto()
BY_FIELD = enum.auto()
BOTH = enum.auto()
[docs]
@dataclass(slots=True)
class SimDataSaveOpts:
"""Options for saving sim data objects to disk.
"""
fields_save_by: ESaveFieldOpt = ESaveFieldOpt.BY_TIME
"""Enumeration specifying the data structure for the physics fields.
"""
array_format: ESaveArray = ESaveArray.TXT
"""Enumeration specifying the file format to save the output files in.
"""
sim_tag: str = ""
"""String tag that will appear as a prefix to all saved output file names.
"""
coords_name: str = "coords"
"""String that will be used after the 'sim_tag' prefix for the coordinates
file.
"""
connect_name: str = "connect"
"""String that will be used after the 'sim_tag' prefix for the connectivity
table file names. Note that there will be one connectivity table per mesh
and there will be labelled 'connect_nameX' where X is an integer.
"""
time_name: str = "time"
"""String that will be used after the 'sim_tag' prefix for the time step
data file.
"""
glob_name: str = "glob"
"""String that will be used after the 'sim_tag' prefix for the global
variable output file.
"""
node_field_name: str = "node_field"
"""String that will be used after the 'sim_tag' prefix for the output node
field variable files.
"""
elem_field_name: str = "elem_field"
"""String that will be used after the 'sim_tag' prefix for the output
element field variable files.
"""
[docs]
def get_coord_name(self) -> str:
"""Assembles the file name for the coordinates. If the 'sim_tag' prefix
is empty it just returns the specified string name for the coordinate
file.
Returns
-------
str
Assemebled filename for the coordinates.
"""
if not self.sim_tag:
return self.coords_name
return f"{self.sim_tag}_{self.coords_name}"
[docs]
def get_connect_name_by_key(self, key: str) -> str:
"""Assembles the connectivity file name using the connectivity
dictionary key taken from the `SimData` object.
Parameters
----------
key : str
String key from the connectivity dictionary in the `SimData` object.
Returns
-------
str
Assembled file name for the specified connectivity table.
"""
if not self.sim_tag:
return key
return f"{self.sim_tag}_{key}"
[docs]
def get_connect_name_by_block(self, block: int) -> str:
"""Assembles the connectivity file name using the specified block and
connectvity name.
Parameters
----------
block : int
Integer to identify the connectivity table.
Returns
-------
str
Assembled file name for the specified connectivity table.
"""
if not self.sim_tag:
return f"{self.connect_name}{block}"
return f"{self.sim_tag}_{self.connect_name}{block}"
[docs]
def get_time_name(self) -> str:
"""Assembles the file name for the time steps.
Returns
-------
str
Assembled file name for the simulation time steps.
"""
if not self.sim_tag:
return self.time_name
return f"{self.sim_tag}_{self.time_name}"
[docs]
def get_glob_name(self) -> str:
"""Assembles the file name for the global output variables.
Returns
-------
str
Assembled file name for the global simulation variables.
"""
if not self.sim_tag:
return self.glob_name
return f"{self.sim_tag}_{self.glob_name}"
[docs]
def get_node_field_name(self) -> str:
"""Assembles the file name for nodal field variables.
Returns
-------
str
Assembled file name for nodal field variables.
"""
if not self.sim_tag:
return self.node_field_name
return f"{self.sim_tag}_{self.node_field_name}"
[docs]
def get_elem_field_name(self, block: int) -> str:
"""Assembles the file name for an element field variable
Parameters
----------
block : int
Block identifying which connectivity table the elements belong to.
Returns
-------
str
Assembled file name for the element field variable.
"""
if not self.sim_tag:
return f"{self.elem_field_name}_block{block}"
return f"{self.sim_tag}_{self.elem_field_name}_block{block}"
[docs]
def save_sim_data_to_arrays(output_path: Path,
sim_data: mh.SimData,
save_opts: SimDataSaveOpts | None = None) -> None:
"""Saves the simulation data to a series of output files in delimited plain
text and/or binary numpy arrays.
Parameters
----------
output_path : Path
Path to the directory where the simulation files will be saved.
sim_data : mh.SimData
Simulation data object containing the data to save to disk.
save_opts : SimDataSaveOpts | None, optional
Options for how the simulation data should be saved, by default None.
Raises
------
FileExistsError
The specified output Path is not a directory.
"""
if not output_path.is_dir():
raise FileExistsError(f"Output directory: {output_path.resolve()}"
+ ", is not a directory.")
if save_opts is None:
save_opts = SimDataSaveOpts()
if sim_data.coords is not None:
save_array(output_path / save_opts.get_coord_name(),
sim_data.coords,
save_format= save_opts.array_format,
txt_header="coord_x,coord_y,coord_z")
if sim_data.connect is not None:
for ii,cc in enumerate(sim_data.connect):
save_array(output_path / save_opts.get_connect_name_by_key(cc),
sim_data.connect[cc],
save_format= save_opts.array_format,
txt_header="")
if sim_data.time is not None:
save_array(output_path / save_opts.get_time_name(),
sim_data.time,
save_format=save_opts.array_format,
txt_header="time,")
if sim_data.glob_vars is not None:
glob_keys = list(sim_data.glob_vars.keys())
glob_header = ",".join(glob_keys)
times_num = sim_data.time.shape[0]
glob_data = np.zeros((times_num,len(glob_keys)))
for ii,gg in enumerate(glob_keys):
glob_data[:,ii] = sim_data.glob_vars[gg]
save_array(output_path / save_opts.get_glob_name(),
glob_data,
save_format=save_opts.array_format,
txt_header=glob_header)
if sim_data.node_vars is not None:
node_keys = list(sim_data.node_vars.keys())
node_header = ",".join(node_keys)
if (save_opts.fields_save_by == ESaveFieldOpt.BY_FIELD or
save_opts.fields_save_by == ESaveFieldOpt.BOTH):
for nn in sim_data.node_vars:
save_file = save_opts.get_node_field_name() + f"_{nn}"
save_array(output_path / save_file,
sim_data.node_vars[nn],
save_format=save_opts.array_format)
if (save_opts.fields_save_by == ESaveFieldOpt.BY_TIME or
save_opts.fields_save_by == ESaveFieldOpt.BOTH):
nodes_num = sim_data.coords.shape[0]
times_num = sim_data.time.shape[0]
width = len(str(times_num))
for tt in range(times_num):
frame_data = np.zeros((nodes_num,len(node_keys)),
dtype=np.float64)
for ii,nn in enumerate(sim_data.node_vars):
frame_data[:,ii] = sim_data.node_vars[nn][:,tt]
frame_str = str(tt).zfill(width)
save_file = (save_opts.get_node_field_name()
+ f"_frame{frame_str}")
save_array(output_path / save_file,
frame_data,
save_format=save_opts.array_format,
txt_header=node_header)
if sim_data.elem_vars is not None:
if (save_opts.fields_save_by == ESaveFieldOpt.BY_FIELD or
save_opts.fields_save_by == ESaveFieldOpt.BOTH):
for ee in sim_data.elem_vars:
save_file = (save_opts.get_elem_field_name(ee[1]) + f"_{ee[0]}")
save_array(output_path / save_file,
sim_data.elem_vars[ee],
save_format=save_opts.array_format)
if (save_opts.fields_save_by == ESaveFieldOpt.BY_TIME or
save_opts.fields_save_by == ESaveFieldOpt.BOTH):
elem_vars = {}
elem_keys = []
for (ff,bb), data in sim_data.elem_vars.items():
if bb not in elem_vars:
elem_vars[bb] = {}
if ff not in elem_keys:
elem_keys.append(ff)
elem_vars[bb][ff] = data
elem_header = ",".join(elem_keys)
times_num = sim_data.time.shape[0]
fields_num = len(elem_keys)
width = len(str(times_num))
elem_vars_by_time = {}
for tt in range(times_num):
for bb in elem_vars:
elems_num = sim_data.connect[f"connect{bb}"].shape[1]
this_field = np.zeros((times_num,elems_num,fields_num)
,dtype=np.float64)
if bb not in elem_vars_by_time:
elem_vars_by_time[bb] = {}
for ff in bb:
ii = elem_keys.index(ff)
this_field[tt,ii,:] = elem_vars[bb][ff][:,tt]
elem_vars_by_time[bb] = this_field
for tt in range(times_num):
for bb in elem_vars_by_time:
save_file = (save_opts.get_elem_field_name(bb)
+ f"_frame{tt}.csv")
save_array(output_path / save_file,
elem_vars_by_time[bb],
save_opts.array_format,
txt_header = elem_header)