from random import randint
from typing import Union, Optional
from pathlib import Path
from zacrostools.header import write_header
from zacrostools.lattice_model import LatticeModel
from zacrostools.energetics_model import EnergeticsModel
from zacrostools.reaction_model import ReactionModel
from zacrostools.gas_model import GasModel
from zacrostools.custom_exceptions import KMCModelError
from zacrostools.custom_exceptions import enforce_types
[docs]
class KMCModel:
"""
Represents a Kinetic Monte Carlo (KMC) model.
Parameters
----------
gas_model : GasModel
An instance containing information about the gas molecules.
reaction_model : ReactionModel
An instance containing information about the reaction model.
energetics_model : EnergeticsModel
An instance containing information about the energetic model.
lattice_model : LatticeModel
An instance containing information about the lattice model.
"""
@enforce_types
def __init__(self,
gas_model: GasModel,
reaction_model: ReactionModel,
energetics_model: EnergeticsModel,
lattice_model: LatticeModel):
self.job_dir: Optional[Path] = None
self.gas_model = gas_model
self.reaction_model = reaction_model
self.energetics_model = energetics_model
self.lattice_model = lattice_model
self.check_errors()
def check_errors(self):
"""
Check for data consistency after initialization.
Raises
------
KMCModelError
If there are inconsistencies in the model configurations.
"""
if self.lattice_model.lattice_type == 'default_choice':
if 'site_types' in self.reaction_model.df.columns:
raise KMCModelError("Remove 'site_types' from the reaction model when using a default lattice.")
if 'site_types' in self.energetics_model.df.columns:
raise KMCModelError("Remove 'site_types' from the energetic model when using a default lattice.")
else:
if 'site_types' not in self.reaction_model.df.columns:
raise KMCModelError("'site_types' are missing in the reaction model.")
if 'site_types' not in self.energetics_model.df.columns:
raise KMCModelError("'site_types' are missing in the energetic model.")
@enforce_types
def create_job_dir(self,
job_path: str,
temperature: Union[float, int],
pressure: dict,
reporting_scheme: Optional[dict] = None,
stopping_criteria: Optional[dict] = None,
manual_scaling: Optional[dict] = None,
stiffness_scaling_algorithm: Optional[str] = None,
stiffness_scalable_steps: Union[list, str, None] = None,
stiffness_scalable_symmetric_steps: Optional[list] = None,
stiffness_scaling_tags: Optional[dict] = None,
sig_figs_energies: int = 8,
sig_figs_pe: int = 8,
sig_figs_lattice: int = 8,
random_seed: Optional[int] = None,
version: Union[float, int] = 5.0):
"""
Create a job directory and write the necessary input files for the KMC simulation.
Parameters
----------
job_path : str
The path for the job directory where input files will be written.
temperature : float or int
Reaction temperature (in K).
pressure : dict
Partial pressures of all gas species (in bar), e.g., {'CO': 1.0, 'O2': 0.001}.
reporting_scheme : dict, optional
Reporting scheme in Zacros format. Must contain the following keys:
'snapshots', 'process_statistics', and 'species_numbers'.
Default is {'snapshots': 'on event 10000',
'process_statistics': 'on event 10000',
'species_numbers': 'on event 10000'}.
stopping_criteria : dict, optional
Stopping criteria in Zacros format. Must contain the following keys:
'max_steps', 'max_time', and 'wall_time'.
Default is {'max_steps': 'infinity',
'max_time': 'infinity',
'wall_time': 86400}.
manual_scaling : dict, optional
Dictionary mapping step names to their corresponding manual scaling factors, e.g.,
{'CO_diffusion': 1.0e-1, 'O_diffusion': 1.0e-2}. Default is {}.
stiffness_scaling_algorithm : str, optional
Algorithm used for stiffness scaling. Allowed values are None (default), 'legacy', or 'prats2024'.
stiffness_scalable_steps : list of str or 'all', optional
Steps that will be marked as 'stiffness_scalable' in mechanism_input.dat.
Can be provided as a list of step names or the string 'all' to indicate that all steps
(except those specified in stiffness_scalable_symmetric_steps) are stiffness scalable.
Default is [].
stiffness_scalable_symmetric_steps : list of str, optional
Steps that will be marked as 'stiffness_scalable_symmetric' in mechanism_input.dat.
Default is [].
stiffness_scaling_tags : dict, optional
Keywords controlling the dynamic stiffness scaling algorithm and their corresponding values, e.g.,
{'check_every': 500, 'min_separation': 400.0, ...}.
The correct types are: integer for 'check_every' and 'min_noccur', and float for all others.
Default is {}
sig_figs_energies : int, optional
Number of significant figures for energy values in input files.
Default is 8.
sig_figs_pe : int, optional
Number of significant figures for pre-exponential factors in mechanism_input.dat.
Default is 8.
sig_figs_lattice : int, optional
Number of significant figures for coordinates in lattice_input.dat.
Default is 8.
random_seed : int, optional
The seed for the random number generator. If not specified, a random seed will be generated.
Default is None.
version : float or int, optional
The Zacros version. Can be a single integer (e.g. 4) or float (e.g. 4.2 or 5.1).
"""
# Parse and validate parameters
parsed_params = self._parse_parameters(
reporting_scheme=reporting_scheme,
stopping_criteria=stopping_criteria,
manual_scaling=manual_scaling,
stiffness_scaling_algorithm=stiffness_scaling_algorithm,
stiffness_scalable_steps=stiffness_scalable_steps,
stiffness_scalable_symmetric_steps=stiffness_scalable_symmetric_steps,
stiffness_scaling_tags=stiffness_scaling_tags,
version=version
)
# Unpack parsed parameters
reporting_scheme = parsed_params['reporting_scheme']
stopping_criteria = parsed_params['stopping_criteria']
manual_scaling = parsed_params['manual_scaling']
stiffness_scaling_algorithm = parsed_params['stiffness_scaling_algorithm']
stiffness_scalable_steps = parsed_params['stiffness_scalable_steps']
stiffness_scalable_symmetric_steps = parsed_params['stiffness_scalable_symmetric_steps']
stiffness_scaling_tags = parsed_params['stiffness_scaling_tags']
self.job_dir = Path(job_path)
if not self.job_dir.exists():
self.job_dir.mkdir(parents=True, exist_ok=True)
self.write_simulation_input(
temperature=temperature,
pressure=pressure,
reporting_scheme=reporting_scheme,
stopping_criteria=stopping_criteria,
stiffness_scaling_algorithm=stiffness_scaling_algorithm,
stiffness_scalable_steps=stiffness_scalable_steps,
stiffness_scalable_symmetric_steps=stiffness_scalable_symmetric_steps,
stiffness_scaling_tags=stiffness_scaling_tags,
sig_figs_energies=sig_figs_energies,
random_seed=random_seed,
version=version)
self.reaction_model.write_mechanism_input(
output_dir=self.job_dir,
temperature=temperature,
gas_model=self.gas_model,
manual_scaling=manual_scaling,
stiffness_scalable_steps=stiffness_scalable_steps,
stiffness_scalable_symmetric_steps=stiffness_scalable_symmetric_steps,
sig_figs_energies=sig_figs_energies,
sig_figs_pe=sig_figs_pe)
self.energetics_model.write_energetics_input(
output_dir=self.job_dir,
sig_figs_energies=sig_figs_energies)
self.lattice_model.write_lattice_input(
output_dir=self.job_dir,
sig_figs=sig_figs_lattice)
else:
print(f'{self.job_dir} already exists (nothing done)')
def _parse_parameters(self,
reporting_scheme,
stopping_criteria,
manual_scaling,
stiffness_scaling_algorithm,
stiffness_scalable_steps,
stiffness_scalable_symmetric_steps,
stiffness_scaling_tags,
version):
"""
Parse and validate the parameters provided to create_job_dir.
"""
# [Reporting scheme and stopping criteria processing as before...]
allowed_reporting_keys = {'snapshots', 'process_statistics', 'species_numbers'}
default_reporting_scheme = {
'snapshots': 'on event 10000',
'process_statistics': 'on event 10000',
'species_numbers': 'on event 10000'
}
if reporting_scheme is None:
reporting_scheme = default_reporting_scheme
else:
reporting_scheme = {key: reporting_scheme.get(key, default_reporting_scheme[key])
for key in allowed_reporting_keys}
allowed_stopping_keys = {'max_steps', 'max_time', 'wall_time'}
default_stopping_criteria = {
'max_steps': 'infinity',
'max_time': float(1.0e+10),
'wall_time': 86400
}
if stopping_criteria is None:
stopping_criteria = default_stopping_criteria
else:
stopping_criteria = {key: stopping_criteria.get(key, default_stopping_criteria[key])
for key in allowed_stopping_keys}
# Validate 'max_steps', 'max_time' and 'wall_time'
if 'max_steps' in stopping_criteria:
ms = stopping_criteria['max_steps']
if ms != 'infinity' and not isinstance(ms, int):
raise KMCModelError(
f"'max_steps' must be either 'infinity' or an integer, got {ms}."
)
if 'max_time' in stopping_criteria:
mt = stopping_criteria['max_time']
if mt != 'infinity' and not isinstance(mt, float):
raise KMCModelError(
f"'max_time' must be either 'infinity' or a float, got {mt}."
)
if 'wall_time' in stopping_criteria:
wt = stopping_criteria['wall_time']
if not isinstance(wt, int):
raise KMCModelError(
f"'wall_time' must be an integer, got {wt}."
)
# Manual scaling
if manual_scaling is None:
manual_scaling = {}
# --- Version-specific stiffness scaling validation ---
if stiffness_scalable_steps is None:
stiffness_scalable_steps = []
if stiffness_scalable_symmetric_steps is None:
stiffness_scalable_symmetric_steps = []
if stiffness_scaling_tags is None:
stiffness_scaling_tags = {}
if float(version) < 3.0:
if (stiffness_scaling_algorithm is not None or
(stiffness_scalable_steps not in (None, []) and len(stiffness_scalable_steps) > 0) or
(stiffness_scalable_symmetric_steps not in (None, []) and len(
stiffness_scalable_symmetric_steps) > 0) or
(stiffness_scaling_tags not in (None, {}) and len(stiffness_scaling_tags) > 0)):
raise KMCModelError("Stiffness scaling is not implemented for Zacros versions lower than 3.")
elif 3.0 <= float(version) < 5.0:
if stiffness_scaling_algorithm is not None:
print(
"Warning: 'stiffness_scaling_algorithm' parameter is not allowed for Zacros versions lower than 5. Ignoring it.")
stiffness_scaling_algorithm = None
if stiffness_scalable_symmetric_steps not in (None, []) and len(stiffness_scalable_symmetric_steps) > 0:
print(
"Warning: 'stiffness_scalable_symmetric_steps' parameter is not allowed for Zacros versions lower than 5. Ignoring it.")
stiffness_scalable_symmetric_steps = []
else:
allowed_scaling_algorithms = {'legacy', 'prats2024'}
if stiffness_scaling_algorithm is not None:
if stiffness_scaling_algorithm not in allowed_scaling_algorithms:
raise KMCModelError(
f"Invalid stiffness_scaling_algorithm '{stiffness_scaling_algorithm}'. "
f"Allowed values are 'legacy' or 'prats2024'."
)
if stiffness_scaling_algorithm is None:
if stiffness_scalable_steps or stiffness_scalable_symmetric_steps or stiffness_scaling_tags:
stiffness_scaling_algorithm = 'legacy'
else:
stiffness_scaling_algorithm = None
if float(version) >= 5.0:
if stiffness_scaling_algorithm in {'legacy', 'prats2024'}:
if not stiffness_scalable_steps and not stiffness_scalable_symmetric_steps:
raise KMCModelError(
"stiffness_scaling_algorithm selected but no steps are stiffness scalable."
)
# Validate stiffness scaling tags
if float(version) >= 5.0:
if stiffness_scaling_algorithm == 'legacy':
allowed_tags = {
'check_every',
'min_separation',
'max_separation',
'max_qequil_separation',
'tol_part_equil_ratio',
'stiffn_coeff_threshold',
'scaling_factor'
}
elif stiffness_scaling_algorithm == 'prats2024':
allowed_tags = {
'check_every',
'min_separation',
'max_separation',
'tol_part_equil_ratio',
'upscaling_factor',
'upscaling_limit',
'downscaling_limit',
'min_noccur'
}
else:
allowed_tags = set()
else:
allowed_tags = {
'check_every',
'min_separation',
'max_separation',
'max_qequil_separation',
'tol_part_equil_ratio',
'stiffn_coeff_threshold',
'scaling_factor'
}
if stiffness_scaling_tags:
invalid_tags = set(stiffness_scaling_tags.keys()) - allowed_tags
if invalid_tags:
raise KMCModelError(
f"Invalid stiffness_scaling_tags keys for Zacros version {version}: "
f"{invalid_tags}. Allowed keys are: {allowed_tags}."
)
for tag, value in stiffness_scaling_tags.items():
if tag in ['check_every', 'min_noccur']:
if not isinstance(value, int):
raise KMCModelError(
f"Invalid type for stiffness_scaling_tags '{tag}': expected int, got {type(value).__name__}."
)
else:
if not isinstance(value, float):
raise KMCModelError(
f"Invalid type for stiffness_scaling_tags '{tag}': expected float, got {type(value).__name__}."
)
return {
'reporting_scheme': reporting_scheme,
'stopping_criteria': stopping_criteria,
'manual_scaling': manual_scaling,
'stiffness_scaling_algorithm': stiffness_scaling_algorithm,
'stiffness_scalable_steps': stiffness_scalable_steps,
'stiffness_scalable_symmetric_steps': stiffness_scalable_symmetric_steps,
'stiffness_scaling_tags': stiffness_scaling_tags
}
def write_simulation_input(self, temperature, pressure, reporting_scheme, stopping_criteria,
stiffness_scaling_algorithm, stiffness_scalable_steps,
stiffness_scalable_symmetric_steps, stiffness_scaling_tags,
sig_figs_energies, random_seed, version):
"""Writes the simulation_input.dat file."""
gas_specs_names = list(self.gas_model.df.index)
surf_specs = self.get_surf_specs()
write_header(f"{self.job_dir}/simulation_input.dat")
try:
with open(f"{self.job_dir}/simulation_input.dat", 'a') as infile:
# Handle random seed
if random_seed is None:
infile.write('random_seed\t'.expandtabs(26) + str(randint(100000, 999999)) + '\n')
else:
infile.write('random_seed\t'.expandtabs(26) + str(random_seed) + '\n')
# Write temperature
infile.write('temperature\t'.expandtabs(26) + str(float(temperature)) + '\n')
# Write total pressure
p_tot = sum(pressure.values())
infile.write('pressure\t'.expandtabs(26) + str(float(p_tot)) + '\n')
# Write number of gas species and their names
infile.write('n_gas_species\t'.expandtabs(26) + str(len(gas_specs_names)) + '\n')
infile.write('gas_specs_names\t'.expandtabs(26) + " ".join(str(x) for x in gas_specs_names) + '\n')
# Write gas energies and molecular weights
tags_dict = ['gas_energy', 'gas_molec_weight']
tags_zacros = ['gas_energies', 'gas_molec_weights']
for tag1, tag2 in zip(tags_dict, tags_zacros):
tag_list = [self.gas_model.df.loc[x, tag1] for x in gas_specs_names]
if tag1 == 'gas_energy':
formatted_tag_list = [f'{x:.{sig_figs_energies}f}' for x in tag_list]
infile.write(f'{tag2}\t'.expandtabs(26) + " ".join(formatted_tag_list) + '\n')
else:
infile.write(f'{tag2}\t'.expandtabs(26) + " ".join(str(x) for x in tag_list) + '\n')
# Write gas molar fractions
try:
gas_molar_frac_list = [pressure[x] / p_tot for x in gas_specs_names]
except KeyError as ke:
print(f"Key not found in 'pressure' dictionary: {ke}")
print(f"When calling KMCModel.create_job_dir(), 'pressure' dictionary must contain the names of all "
f"gas species ")
gas_molar_frac_list = [0.0 for _ in gas_specs_names] # Assign zero fractions for missing species
infile.write(f'gas_molar_fracs\t'.expandtabs(26) + " ".join(str(x) for x in gas_molar_frac_list) + '\n')
# Write number of surface species and their names and dentates
infile.write('n_surf_species\t'.expandtabs(26) + str(len(surf_specs)) + '\n')
infile.write('surf_specs_names\t'.expandtabs(26) + " ".join(str(x) for x in surf_specs.keys()) + '\n')
infile.write('surf_specs_dent\t'.expandtabs(26) + " ".join(str(x) for x in surf_specs.values()) + '\n')
# Write reporting scheme
for tag in ['snapshots', 'process_statistics', 'species_numbers']:
infile.write((tag + '\t').expandtabs(26) + str(reporting_scheme.get(tag, '')) + '\n')
# Write stopping criteria
for tag in ['max_steps', 'max_time', 'wall_time']:
infile.write((tag + '\t').expandtabs(26) + str(stopping_criteria.get(tag, '')) + '\n')
# Handle stiffness scaling based on version:
if float(version) >= 5.0:
if stiffness_scalable_steps or stiffness_scalable_symmetric_steps:
if stiffness_scaling_algorithm is None:
infile.write("enable_stiffness_scaling\n")
else:
infile.write('enable_stiffness_scaling\t'.expandtabs(26) + stiffness_scaling_algorithm + '\n')
for tag in stiffness_scaling_tags:
infile.write((tag + '\t').expandtabs(26) + str(stiffness_scaling_tags[tag]) + '\n')
else:
if stiffness_scalable_steps:
infile.write("enable_stiffness_scaling\n")
for tag in stiffness_scaling_tags:
infile.write((tag + '\t').expandtabs(26) + str(stiffness_scaling_tags[tag]) + '\n')
infile.write(f"finish\n")
except IOError as e:
raise KMCModelError(f"Failed to write to 'simulation_input.dat': {e}")
def get_surf_specs(self):
"""
Identify all surface species and their corresponding dentates from the `energetics_model` DataFrame.
Used to write `'surf_specs_names'` and `'surf_specs_dent'` in the `simulation_input.dat` file.
Returns
-------
dict
A dictionary with surface species names as keys and dentates as values.
Raises
------
KMCModelError
If the `lattice_state` format is invalid.
"""
surf_specs = {}
for cluster in self.energetics_model.df.index:
lattice_state = self.energetics_model.df.loc[cluster, 'lattice_state']
for site in lattice_state:
# Assuming the format is '1 CO* 1' or similar
parts = site.split()
if len(parts) >= 3:
surf_specs_name = parts[1]
try:
surf_specs_dent = int(parts[2])
except ValueError:
raise KMCModelError(
f"Invalid dentate value in lattice_state for cluster '{cluster}': {parts[2]}")
if surf_specs_name not in surf_specs or (
surf_specs_name in surf_specs and surf_specs_dent > surf_specs[surf_specs_name]):
surf_specs[surf_specs_name] = surf_specs_dent
else:
raise KMCModelError(
f"Invalid lattice_state format for cluster '{cluster}': {site}")
return surf_specs