Source code for zacrostools.kmc_output

import os
import numpy as np
from typing import Union
from zacrostools.simulation_input import parse_simulation_input_file
from zacrostools.general_output import parse_general_output_file
from zacrostools.specnum_output import parse_specnum_output_file
from zacrostools.custom_exceptions import enforce_types, KMCOutputError, EnergeticsModelError


[docs] class KMCOutput: """ A class that represents a KMC (Kinetic Monte Carlo) simulation output. Attributes ---------- area : float Lattice surface area (in Ų). av_coverage : Dict[str, float] Average coverage of surface species (in %). Example: `KMCOutput.av_coverage['CO']`. av_coverage_per_site_type : Dict[str, Dict[str, float]] Average coverage of surface species per site type (in %). av_energy : float Average lattice energy (in eV·Å⁻²). av_total_coverage : float Average total coverage of surface species (in %). av_total_coverage_per_site_type : Dict[str, float] Average total coverage of surface species per site type (in %). coverage : Dict[str, np.ndarray] Coverage of surface species over time (in %). Example: `KMCOutput.coverage['CO']`. coverage_per_site_type : Dict[str, Dict[str, np.ndarray]] Coverage of surface species per site type over time (in %). cpu_time : float Final elapsed cpu time (in seconds). dominant_ads : str Most dominant surface species, used for plotting kinetic phase diagrams. dominant_ads_per_site_type : Dict[str, str] Most dominant surface species per site type, used for plotting kinetic phase diagrams. energy : np.ndarray Lattice energy (in eV·Å⁻²). final_energy : float Final lattice energy (in eV·Å⁻²). finaltime : float Final simulated time (in seconds). gas_specs_names : List[str] Gas species names. n_gas_species : int Number of gas species. n_sites : int Total number of lattice sites. n_surf_species : int Number of surface species. nevents : np.ndarray Number of events occurred. production : Dict[str, np.ndarray] Gas species produced over time. Example: `KMCOutput.production['CO']`. surf_specs_names : List[str] Surface species names. time : np.ndarray Simulated time (in seconds). tof : Dict[str, float] TOF (Turnover Frequency) of gas species (in molecules·s⁻¹·Å⁻²). Example: `KMCOutput.tof['CO2']`. total_coverage : np.ndarray Total coverage of surface species over time (in %). total_coverage_per_site_type : Dict[str, np.ndarray] Total coverage of surface species per site type over time (in %). Example: `KMCOutput.total_coverage_per_site_type['top']`. total_production : Dict[str, float] Total number of gas species produced. Example: `KMCOutput.total_production['CO']`. """ def __init__(self, job_path: str = None, analysis_range: Union[list, None] = None, range_type: str = 'time', weights: Union[str, None] = None, **kwargs): """ Initialize the KMCOutput object by parsing simulation output files. Parameters ---------- job_path : str, optional The path where the output files are located. (Previously named 'path'.) For backward compatibility, you can still pass this value using the keyword 'path'. analysis_range : List[float], optional A list of two elements `[start_percent, end_percent]` specifying the portion of the entire simulation to consider for analysis. The values should be between 0 and 100, representing percentages of the total simulated time or the total number of events, depending on `range_type`. For example, `[50, 100]` would analyze only the latter half of the simulation. Default is `[0.0, 100.0]`. range_type : str, optional Determines the dimension used when applying `analysis_range`: - `'time'`: The percentages in `analysis_range` refer to segments of the total simulated time. - `'nevents'`: The percentages in `analysis_range` refer to segments of the total number of simulated events. Default is `'time'`. weights : str, optional Weights for calculating weighted averages. Possible values are `'time'`, `'nevents'`, or `None`. If `None`, all weights are set to 1. Default value is `None`. """ # Support backward compatibility: if job_path is not provided, check for 'path' if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") self.job_path = job_path if analysis_range is None: analysis_range = [0.0, 100.0] # Parse relevant data from the simulation_input.dat file data_simulation = parse_simulation_input_file( input_file=f'{self.job_path}/simulation_input.dat') self.random_seed = data_simulation['random_seed'] self.temperature = data_simulation['temperature'] self.pressure = data_simulation['pressure'] self.n_gas_species = data_simulation['n_gas_species'] self.gas_specs_names = data_simulation['gas_specs_names'] self.gas_molar_fracs = data_simulation['gas_molar_fracs'] self.n_surf_species = data_simulation['n_surf_species'] self.surf_specs_names = data_simulation['surf_specs_names'] self.surf_specs_dent = data_simulation['surf_specs_dent'] # Parse relevant data from the general_output.txt file data_general = parse_general_output_file( output_file=f'{self.job_path}/general_output.txt') self.n_sites = data_general['n_sites'] self.area = data_general['area'] self.site_types = data_general['site_types'] self.cpu_time = data_general['cpu_time'] # Parse relevant data from the specnum_output.txt file data_specnum, header = parse_specnum_output_file( output_file=f'{self.job_path}/specnum_output.txt', analysis_range=analysis_range, range_type=range_type) self.nevents = data_specnum[:, 1] self.time = data_specnum[:, 2] self.final_time = data_specnum[-1, 2] self.energy = data_specnum[:, 4] / self.area # in eV/Å2 # If the energy is constant, avoid numerical noise in polyfit by setting slope to zero. if np.ptp(self.energy) > 1e-12: self.energyslope = abs(np.polyfit(self.nevents, self.energy, 1)[0]) # in eV/Ų/step else: self.energyslope = 0.0 self.final_energy = data_specnum[-1, 4] / self.area self.av_energy = self.get_average(array=self.energy, weights=weights) # Compute production and TOF self.production = {} # in molecules self.total_production = {} # useful when calculating selectivity (i.e., set min_total_production) self.tof = {} # in molecules·s⁻¹·Å⁻² for i in range(5 + self.n_surf_species, len(header)): gas_spec = header[i] self.production[gas_spec] = data_specnum[:, i] self.total_production[gas_spec] = data_specnum[-1, i] - data_specnum[0, i] gas_data = data_specnum[:, i] # Check if production is constant using the peak-to-peak difference. if len(gas_data) > 1 and np.ptp(gas_data) > 1e-18: slope = np.polyfit(data_specnum[:, 2], gas_data, 1)[0] # Force slope to zero if it is within a small tolerance. if np.isclose(slope, 0, atol=1e-18): slope = 0.0 self.tof[gas_spec] = slope / self.area else: self.tof[gas_spec] = 0.0 # Compute coverages (per total number of sites) dent_sites_map = get_dentate_types(job_path=self.job_path) self.coverage = {} self.av_coverage = {} for i in range(5, 5 + self.n_surf_species): surf_spec = header[i] dent_sites = dent_sites_map[surf_spec] total_dentates = len(dent_sites) # total occupied sites = #molecules * #dentates per molecule self.coverage[surf_spec] = data_specnum[:, i] * total_dentates / self.n_sites * 100 self.av_coverage[surf_spec] = self.get_average( array=self.coverage[surf_spec], weights=weights ) self.total_coverage = sum(self.coverage.values()) self.av_total_coverage = min(sum(self.av_coverage.values()), 100) self.dominant_ads = max(self.av_coverage, key=self.av_coverage.get) # Compute partial coverages (per total number of sites of each site_type) self.coverage_per_site_type = {st: {} for st in self.site_types} self.av_coverage_per_site_type = {st: {} for st in self.site_types} for i in range(5, 5 + self.n_surf_species): surf_spec = header[i] dent_sites = dent_sites_map[surf_spec] for site_type, n_sites_type in self.site_types.items(): # number of dentates of this species on this site_type n_dents_on_type = dent_sites.count(site_type) self.coverage_per_site_type[site_type][surf_spec] = ( data_specnum[:, i] * n_dents_on_type / n_sites_type * 100 ) self.av_coverage_per_site_type[site_type][surf_spec] = self.get_average( array=self.coverage_per_site_type[site_type][surf_spec], weights=weights ) self.total_coverage_per_site_type = {} self.av_total_coverage_per_site_type = {} self.dominant_ads_per_site_type = {} for site_type, cov_dict in self.av_coverage_per_site_type.items(): if cov_dict: self.total_coverage_per_site_type[site_type] = sum( self.coverage_per_site_type[site_type].values() ) self.av_total_coverage_per_site_type[site_type] = min( sum(cov_dict.values()), 100 ) self.dominant_ads_per_site_type[site_type] = max( cov_dict, key=cov_dict.get ) else: # no adsorption on this site_type self.total_coverage_per_site_type[site_type] = np.zeros_like(self.time) self.av_total_coverage_per_site_type[site_type] = 0.0 self.dominant_ads_per_site_type[site_type] = None def get_average(self, array, weights): """ Calculate the average of an array with optional weighting. Parameters ---------- array : np.ndarray The array of values to average. weights : str or None The weights to apply when calculating the average. Possible values are: - `None`: No weighting; all weights are set to 1. - `'time'`: Weights based on the differences in simulated time. - `'nevents'`: Weights based on the differences in the number of events. Returns ------- float The calculated average value. """ if weights not in [None, 'time', 'nevents']: raise KMCOutputError(f"'weights' must be one of the following: 'none' (default), 'time', or 'nevents'.") if len(array) == 1: # If the catalyst is poisoned, it could be that the last ∆t is very high and the time window only # contains one row. In that case, do not compute the average return float(array[0]) else: if weights is None: return np.average(array) elif weights == 'time': return np.average(array[1:], weights=np.diff(self.time)) elif weights == 'nevents': return np.average(array[1:], weights=np.diff(self.nevents)) @enforce_types def get_selectivity(self, main_product: str, side_products: list): """ Calculate the selectivity of the main product over side products. Parameters ---------- main_product : str Name of the main product. side_products : List[str] Names of the side products. Returns ------- float The selectivity of the main product (in %) over the side products. Notes ----- The selectivity is calculated as: selectivity = (TOF_main_product / (TOF_main_product + sum(TOF_side_products))) * 100 If the total TOF is zero, the selectivity is returned as NaN. """ selectivity = float('NaN') tof_side_products = 0.0 for side_product in side_products: tof_side_products += self.tof[side_product] if self.tof[main_product] + tof_side_products != 0: selectivity = self.tof[main_product] / (self.tof[main_product] + tof_side_products) * 100 return selectivity
def get_dentate_types(job_path: str = None, **kwargs): """ Retrieve surface species data including the number of dentates and the associated site types for each surface species. Parameters ---------- job_path : str, optional The path to the directory containing the simulation input files (simulation_input.dat and energetics_input.dat). (Previously named 'path'.) For backward compatibility, you can pass 'path' instead. Returns ------- dict A dictionary mapping each surface species name to a list of site types indexed by dentate number (1-based). """ import warnings # backward compatibility for 'path' if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") # parse simulation input for species and dentates sim_data = parse_simulation_input_file(input_file=f"{job_path}/simulation_input.dat") surf_specs = sim_data.get('surf_specs_names', []) dent_counts = sim_data.get('surf_specs_dent', []) species_dentates = dict(zip(surf_specs, dent_counts)) # default lattice: every dentate on default site if check_default_lattice(job_path=job_path): return {spec: ['StTp1'] * species_dentates[spec] for spec in surf_specs} # container for global species->dentate mapping species_mapping = {} # species -> {dentate_index: site_type} # read energetics_input.dat with open(os.path.join(job_path, 'energetics_input.dat'), 'r') as f: lines = f.readlines() i = 0 n = len(lines) while i < n: line = lines[i].strip() if line.startswith('cluster'): # record cluster identifier for messages parts = line.split(maxsplit=1) cluster_id = parts[1] if len(parts) > 1 else '' # reset block data sites_count = None state_entries = [] site_types = [] i += 1 # parse cluster block while i < n and not lines[i].strip().startswith('end_cluster'): text = lines[i].strip() if text.startswith('sites'): toks = text.split() if len(toks) >= 2 and toks[1].isdigit(): sites_count = int(toks[1]) if text.startswith('lattice_state'): i += 1 count = 0 while i < n and count < (sites_count or float('inf')): row = lines[i].strip() if not row or row.startswith('#'): i += 1 continue key = row.split()[0] if key in ('site_types', 'graph_multiplicity', 'cluster_eng', 'neighboring', 'angles'): break parts = row.split() if parts and parts[0] == '&': state_entries.append(None) elif len(parts) >= 3 and parts[0].isdigit() and parts[2].isdigit(): spec = parts[1] dent = int(parts[2]) state_entries.append((spec, dent)) i += 1 count += 1 continue if text.startswith('site_types'): toks = text.split() site_types = toks[1:] i += 1 # reached end_cluster if sites_count is None: raise EnergeticsModelError(f"Missing 'sites' declaration in cluster '{cluster_id}'.") if len(site_types) != sites_count: raise EnergeticsModelError( f"Cluster '{cluster_id}' declares {sites_count} sites but site_types has {len(site_types)} entries: {site_types}") if len(state_entries) != sites_count: raise EnergeticsModelError( f"Cluster '{cluster_id}' declares {sites_count} sites but lattice_state provided {len(state_entries)} entries.") # build per-cluster mapping: species -> {dentate: site_type} cluster_map = {} for idx in range(sites_count): entry = state_entries[idx] stype = site_types[idx] if entry is None: continue spec, dent = entry exp = species_dentates.get(spec) if exp is None: raise EnergeticsModelError( f"Species '{spec}' in cluster '{cluster_id}' not declared in simulation_input.dat.") if dent < 1 or dent > exp: raise EnergeticsModelError( f"Dentate index {dent} out of range for species '{spec}' in cluster '{cluster_id}' (expected 1..{exp}).") cluster_map.setdefault(spec, {})[dent] = stype # ensure full dentate coverage per species for spec, dent_dict in cluster_map.items(): if len(dent_dict) != species_dentates[spec]: raise EnergeticsModelError( f"Species '{spec}' in cluster '{cluster_id}' has {species_dentates[spec]} dentates but got {len(dent_dict)} entries.") # merge into global mapping with order-flexible consistency for spec, dent_dict in cluster_map.items(): if spec not in species_mapping: species_mapping[spec] = dent_dict.copy() else: existing_dict = species_mapping[spec] if existing_dict != dent_dict: # build ordered lists existing_list = [existing_dict[d] for d in sorted(existing_dict)] new_list = [dent_dict[d] for d in sorted(dent_dict)] # same multiset? then warn and keep existing order if sorted(existing_list) == sorted(new_list): warnings.warn( f"In cluster '{cluster_id}', dentates for species '{spec}' appear in {new_list} " f"instead of {existing_list}; keeping original order.") else: raise EnergeticsModelError( f"Inconsistent site types for species '{spec}' in cluster '{cluster_id}': " f"{existing_list} vs {new_list}") i += 1 # skip end_cluster else: i += 1 # finalize result result = {} for spec in surf_specs: if spec not in species_mapping: raise EnergeticsModelError( f"Species '{spec}' declared in simulation_input.dat but not found in energetics_input.dat.") dent_dict = species_mapping[spec] cnt = species_dentates[spec] sites = [None] * cnt for dent, stype in dent_dict.items(): sites[dent - 1] = stype result[spec] = sites return result def check_default_lattice(job_path: str = None, **kwargs): """ Check whether the simulation uses a default lattice configuration. Parameters ---------- job_path : str, optional The path to the directory containing the `lattice_input.dat` file. (Previously named 'path'.) For backward compatibility, you can pass 'path' instead. Returns ------- bool True if the `lattice_input.dat` file indicates a default lattice configuration, False otherwise. """ if job_path is None: job_path = kwargs.pop('path', None) if job_path is None: raise TypeError("Missing required argument: job_path (or 'path' for backwards compatibility)") with open(os.path.join(job_path, 'lattice_input.dat'), 'r') as file: for line in file: if 'lattice' in line and 'default_choice' in line: return True return False