Source code for zacrostools.kmc_output

import os
import numpy as np
from typing import Union
from zacrostools.simulation_input import parse_simulation_input_file
from zacrostools.general_output import parse_general_output_file
from zacrostools.specnum_output import parse_specnum_output_file
from zacrostools.custom_exceptions import enforce_types, KMCOutputError, EnergeticsModelError


[docs] class KMCOutput: """ A class that represents a KMC (Kinetic Monte Carlo) simulation output. Attributes ---------- area : float Lattice surface area (in Ų). av_coverage : Dict[str, float] Average coverage of surface species (in %). Example: `KMCOutput.av_coverage['CO']`. av_coverage_per_site_type : Dict[str, Dict[str, float]] Average coverage of surface species per site type (in %). av_energy : float Average lattice energy (in eV·Å⁻²). av_total_coverage : float Average total coverage of surface species (in %). av_total_coverage_per_site_type : Dict[str, float] Average total coverage of surface species per site type (in %). coverage : Dict[str, np.ndarray] Coverage of surface species over time (in %). Example: `KMCOutput.coverage['CO']`. coverage_per_site_type : Dict[str, Dict[str, np.ndarray]] Coverage of surface species per site type over time (in %). cpu_time : float Final elapsed cpu time (in seconds). dominant_ads : str Most dominant surface species, used for plotting kinetic phase diagrams. dominant_ads_per_site_type : Dict[str, str] Most dominant surface species per site type, used for plotting kinetic phase diagrams. energy : np.ndarray Lattice energy (in eV·Å⁻²). final_energy : float Final lattice energy (in eV·Å⁻²). finaltime : float Final simulated time (in seconds). gas_specs_names : List[str] Gas species names. n_gas_species : int Number of gas species. n_sites : int Total number of lattice sites. n_surf_species : int Number of surface species. nevents : np.ndarray Number of events occurred. production : Dict[str, np.ndarray] Gas species produced over time. Example: `KMCOutput.production['CO']`. surf_specs_names : List[str] Surface species names. time : np.ndarray Simulated time (in seconds). tof : Dict[str, float] TOF (Turnover Frequency) of gas species (in molecules·s⁻¹·Å⁻²). Example: `KMCOutput.tof['CO2']`. total_coverage : np.ndarray Total coverage of surface species over time (in %). total_coverage_per_site_type : Dict[str, np.ndarray] Total coverage of surface species per site type over time (in %). Example: `KMCOutput.total_coverage_per_site_type['top']`. total_production : Dict[str, float] Total number of gas species produced. Example: `KMCOutput.total_production['CO']`. """ def __init__(self, job_path: str = None, analysis_range: Union[list, None] = None, range_type: str = 'time', weights: Union[str, None] = None, **kwargs): """ Initialize the KMCOutput object by parsing simulation output files. Parameters ---------- job_path : str, optional The path where the output files are located. (Previously named 'path'.) For backward compatibility, you can still pass this value using the keyword 'path'. analysis_range : List[float], optional A list of two elements `[start_percent, end_percent]` specifying the portion of the entire simulation to consider for analysis. The values should be between 0 and 100, representing percentages of the total simulated time or the total number of events, depending on `range_type`. For example, `[50, 100]` would analyze only the latter half of the simulation. Default is `[0.0, 100.0]`. range_type : str, optional Determines the dimension used when applying `analysis_range`: - `'time'`: The percentages in `analysis_range` refer to segments of the total simulated time. - `'nevents'`: The percentages in `analysis_range` refer to segments of the total number of simulated events. Default is `'time'`. weights : str, optional Weights for calculating weighted averages. Possible values are `'time'`, `'nevents'`, or `None`. If `None`, all weights are set to 1. Default value is `None`. """ # Support backward compatibility: if job_path is not provided, check for 'path' if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") self.job_path = job_path if analysis_range is None: analysis_range = [0.0, 100.0] # Parse relevant data from the simulation_input.dat file data_simulation = parse_simulation_input_file( input_file=f'{self.job_path}/simulation_input.dat') self.random_seed = data_simulation['random_seed'] self.temperature = data_simulation['temperature'] self.pressure = data_simulation['pressure'] self.n_gas_species = data_simulation['n_gas_species'] self.gas_specs_names = data_simulation['gas_specs_names'] self.gas_molar_fracs = data_simulation['gas_molar_fracs'] self.n_surf_species = data_simulation['n_surf_species'] self.surf_specs_names = data_simulation['surf_specs_names'] self.surf_specs_dent = data_simulation['surf_specs_dent'] # Parse relevant data from the general_output.txt file data_general = parse_general_output_file( output_file=f'{self.job_path}/general_output.txt') self.n_sites = data_general['n_sites'] self.area = data_general['area'] self.site_types = data_general['site_types'] self.cpu_time = data_general['cpu_time'] # Parse relevant data from the specnum_output.txt file data_specnum, header = parse_specnum_output_file( output_file=f'{self.job_path}/specnum_output.txt', analysis_range=analysis_range, range_type=range_type) self.nevents = data_specnum[:, 1] self.time = data_specnum[:, 2] self.final_time = data_specnum[-1, 2] self.energy = data_specnum[:, 4] / self.area # in eV/Å2 # If the energy is constant, avoid numerical noise in polyfit by setting slope to zero. if np.ptp(self.energy) > 1e-12: self.energyslope = abs(np.polyfit(self.nevents, self.energy, 1)[0]) # in eV/Ų/step else: self.energyslope = 0.0 self.final_energy = data_specnum[-1, 4] / self.area self.av_energy = self.get_average(array=self.energy, weights=weights) # Compute production and TOF self.production = {} # in molecules self.total_production = {} # useful when calculating selectivity (i.e., set min_total_production) self.tof = {} # in molecules·s⁻¹·Å⁻² for i in range(5 + self.n_surf_species, len(header)): gas_spec = header[i] self.production[gas_spec] = data_specnum[:, i] self.total_production[gas_spec] = data_specnum[-1, i] - data_specnum[0, i] gas_data = data_specnum[:, i] # Check if production is constant using the peak-to-peak difference. if len(gas_data) > 1 and np.ptp(gas_data) > 1e-18: slope = np.polyfit(data_specnum[:, 2], gas_data, 1)[0] # Force slope to zero if it is within a small tolerance. if np.isclose(slope, 0, atol=1e-18): slope = 0.0 self.tof[gas_spec] = slope / self.area else: self.tof[gas_spec] = 0.0 # Compute coverages (per total number of sites) surf_specs_data = get_surf_specs_data(job_path=self.job_path) self.coverage = {} self.av_coverage = {} for i in range(5, 5 + self.n_surf_species): surf_spec = header[i].replace('*', '') num_dentates = surf_specs_data[surf_spec]['surf_specs_dent'] self.coverage[surf_spec] = data_specnum[:, i] * num_dentates / self.n_sites * 100 self.av_coverage[surf_spec] = self.get_average(array=self.coverage[surf_spec], weights=weights) self.total_coverage = sum(self.coverage.values()) self.av_total_coverage = min(sum(self.av_coverage.values()), 100) # prevent numerical errors over 100% self.dominant_ads = max(self.av_coverage, key=self.av_coverage.get) # Compute partial coverages (per total number of sites of a given type) self.coverage_per_site_type = {} self.av_coverage_per_site_type = {} for site_type in self.site_types: self.coverage_per_site_type[site_type] = {} self.av_coverage_per_site_type[site_type] = {} for i in range(5, 5 + self.n_surf_species): surf_spec = header[i].replace('*', '') site_type = surf_specs_data[surf_spec]['site_type'] num_dentates = surf_specs_data[surf_spec]['surf_specs_dent'] self.coverage_per_site_type[site_type][surf_spec] = ( data_specnum[:, i] * num_dentates / self.site_types[site_type] * 100 ) self.av_coverage_per_site_type[site_type][surf_spec] = self.get_average( array=self.coverage_per_site_type[site_type][surf_spec], weights=weights ) self.total_coverage_per_site_type = {} self.av_total_coverage_per_site_type = {} self.dominant_ads_per_site_type = {} for site_type in self.site_types: if len(self.av_coverage_per_site_type[site_type]) > 0: self.total_coverage_per_site_type[site_type] = sum(self.coverage_per_site_type[site_type].values()) self.av_total_coverage_per_site_type[site_type] = min( sum(self.av_coverage_per_site_type[site_type].values()), 100) # prevent numerical errors over 100% self.dominant_ads_per_site_type[site_type] = max( self.av_coverage_per_site_type[site_type], key=self.av_coverage_per_site_type[site_type].get) else: # No species are adsorbed on this site type self.total_coverage_per_site_type[site_type] = np.zeros_like(self.time) self.av_total_coverage_per_site_type[site_type] = 0.0 self.dominant_ads_per_site_type[site_type] = None def get_average(self, array, weights): """ Calculate the average of an array with optional weighting. Parameters ---------- array : np.ndarray The array of values to average. weights : str or None The weights to apply when calculating the average. Possible values are: - `None`: No weighting; all weights are set to 1. - `'time'`: Weights based on the differences in simulated time. - `'nevents'`: Weights based on the differences in the number of events. Returns ------- float The calculated average value. """ if weights not in [None, 'time', 'nevents']: raise KMCOutputError(f"'weights' must be one of the following: 'none' (default), 'time', or 'nevents'.") if len(array) == 1: # If the catalyst is poisoned, it could be that the last ∆t is very high and the time window only # contains one row. In that case, do not compute the average return float(array[0]) else: if weights is None: return np.average(array) elif weights == 'time': return np.average(array[1:], weights=np.diff(self.time)) elif weights == 'nevents': return np.average(array[1:], weights=np.diff(self.nevents)) @enforce_types def get_selectivity(self, main_product: str, side_products: list): """ Calculate the selectivity of the main product over side products. Parameters ---------- main_product : str Name of the main product. side_products : List[str] Names of the side products. Returns ------- float The selectivity of the main product (in %) over the side products. Notes ----- The selectivity is calculated as: selectivity = (TOF_main_product / (TOF_main_product + sum(TOF_side_products))) * 100 If the total TOF is zero, the selectivity is returned as NaN. """ selectivity = float('NaN') tof_side_products = 0.0 for side_product in side_products: tof_side_products += self.tof[side_product] if self.tof[main_product] + tof_side_products != 0: selectivity = self.tof[main_product] / (self.tof[main_product] + tof_side_products) * 100 return selectivity
def get_surf_specs_data(job_path: str = None, **kwargs): """ Retrieve surface species data including the number of dentates and the associated site type for each species. Parameters ---------- job_path : str, optional The path to the directory containing the simulation input files (simulation_input.dat and energetics_input.dat). (Previously named 'path'.) For backward compatibility, you can pass 'path' instead. Returns ------- dict A dictionary mapping each surface species name to its data, including: - 'surf_specs_dent': int Number of dentates required by the species. - 'site_type': str The site type on which the species adsorbs. """ if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") # Get data from simulation_input.dat parsed_sim_data = parse_simulation_input_file(input_file=f"{job_path}/simulation_input.dat") surf_specs_names = parsed_sim_data.get('surf_specs_names') surf_specs_dent = parsed_sim_data.get('surf_specs_dent') species_dentates = dict(zip(surf_specs_names, surf_specs_dent)) species_in_simulation = set(surf_specs_names) surf_specs_data = {} # Check if the user is using a default lattice or not default_lattice = check_default_lattice(job_path=job_path) if default_lattice: for species in surf_specs_names: surf_specs_data[species] = { 'surf_specs_dent': species_dentates[species], 'site_type': 'StTp1' } else: with open(os.path.join(job_path, 'energetics_input.dat'), 'r') as f: lines = f.readlines() species_site_types = {} num_lines = len(lines) i = 0 while i < num_lines: line = lines[i].strip() if line.startswith('cluster'): cluster_species = [] site_types = [] i += 1 while i < num_lines: line = lines[i].strip() if line.startswith('end_cluster'): break elif line.startswith('lattice_state'): # Process lattice_state block i += 1 # Move to the next line after 'lattice_state' while i < num_lines: line = lines[i].strip() if not line or line.startswith('#'): i += 1 continue if line.startswith('site_types') or line.startswith('cluster_eng') or line.startswith( 'neighboring') or line.startswith('end_cluster'): break # End of lattice_state block tokens = line.split() if tokens and tokens[0].isdigit(): species_name = tokens[1].rstrip('*') cluster_species.append(species_name) i += 1 elif line.startswith('site_types'): tokens = line.split() site_types = tokens[1:] i += 1 # Move to the next line after 'site_types' continue # Continue to process other lines in the cluster else: i += 1 # After processing the cluster if len(cluster_species) != len(site_types): raise EnergeticsModelError(f"Mismatch between number of species and site_types in a cluster in " f"line {i + 1}." f"\nCluster species: {cluster_species}" f"\nSite types: {site_types}") # Associate species with site types for species, site_type in zip(cluster_species, site_types): if species not in species_in_simulation: raise EnergeticsModelError( f"Species '{species}' declared in energetics_input.dat but not in surf_specs_names.") if species in species_site_types: if species_site_types[species] != site_type: raise EnergeticsModelError( f"Species '{species}' is adsorbed on multiple site types: " f"'{species_site_types[species]}' and '{site_type}'") else: species_site_types[species] = site_type i += 1 # Move past 'end_cluster' else: i += 1 for species in surf_specs_names: if species not in species_site_types: raise EnergeticsModelError(f"Species '{species}' declared in surf_specs_names but not found in " f"energetics_input.dat.") surf_specs_data[species] = { 'surf_specs_dent': species_dentates[species], 'site_type': species_site_types[species] } return surf_specs_data def check_default_lattice(job_path: str = None, **kwargs): """ Check whether the simulation uses a default lattice configuration. Parameters ---------- job_path : str, optional The path to the directory containing the `lattice_input.dat` file. (Previously named 'path'.) For backward compatibility, you can pass 'path' instead. Returns ------- bool True if the `lattice_input.dat` file indicates a default lattice configuration, False otherwise. """ if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") with open(os.path.join(job_path, 'lattice_input.dat'), 'r') as file: for line in file: if 'lattice' in line and 'default_choice' in line: return True return False