diff --git a/src/atomate2/abinit/flows/bse.py b/src/atomate2/abinit/flows/bse.py new file mode 100644 index 0000000000..cb52d20019 --- /dev/null +++ b/src/atomate2/abinit/flows/bse.py @@ -0,0 +1,291 @@ +"""Core abinit flow makers.""" + +import numpy as np +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional, Union + +from jobflow import Flow, Maker, Response, job +from pymatgen.core.structure import Structure + +from atomate2.abinit.jobs.base import BaseAbinitMaker +from atomate2.abinit.jobs.core import NonSCFMaker, StaticMaker, ConvergenceMaker +from atomate2.abinit.jobs.bse import BSEmdfMaker, BSEscrMaker +from atomate2.abinit.flows.gw import G0W0Maker +from atomate2.abinit.powerups import update_user_abinit_settings, update_user_kpoints_settings +from pymatgen.io.abinit.abiobjects import KSampling + + +@dataclass +class BSEFlowMaker(Maker): + """ + Maker to generate workflow for BSE calculations. + + Parameters + ---------- + name : str + A name for the job + nscf_maker : .BaseAbinitMaker + The maker to use for the non-scf calculation. + bse_maker : .BaseAbinitMaker + The maker to use for the bse calculations. + kppa: integer + Grid density for k-mesh + shifts : tuple + Shift from gamma centered k-grid + mbpt_sciss : float + Scissor shift added to the conductions states in eV, + Default value 0.0 eV + mdf_epsinf : float + The value of the macroscopic dielectric function + used to model the screening function. + enwinbse : float + Energy window from band-edges in which all conduction + and valence bands are included in BSE calculations in eV. + Default value 3.0 eV + + """ + + name: str = "BSE calculation" + nscf_maker: BaseAbinitMaker = field(default_factory=NonSCFMaker) + bse_maker: BaseAbinitMaker = field(default_factory=BSEmdfMaker) + kppa: int = 1000 + shifts: tuple = (0.11, 0.22, 0.33) + mbpt_sciss: float = 0.0 + mdf_epsinf: float = None + enwinbse: float = 3.0 + + def make( + self, + structure: Structure, + prev_outputs: Union[str, Path, list[str]], + ): + + nscf_job = self.nscf_maker.make( + prev_outputs=prev_outputs[0], + mode="uniform", + ) + + nscf_job = update_user_kpoints_settings( + flow=nscf_job, + kpoints_updates=KSampling.automatic_density( + structure=structure, + kppa=self.kppa, + shifts=self.shifts, + chksymbreak=0) + ) + nscf_job = update_user_abinit_settings( + flow=nscf_job, + abinit_updates={"nstep": 50} + ) + bse_prepjob = self.find_bse_params( + nscf_job.output.output.bandlims, + self.enwinbse, + nscf_job.output.output.direct_gap + ) + + if len(prev_outputs)==2: + prev_outputs=[nscf_job.output.dir_name, prev_outputs[1]] + else: + prev_outputs=[nscf_job.output.dir_name] + + bse_job = self.bse_maker.make( + prev_outputs=prev_outputs, + mbpt_sciss=self.mbpt_sciss, + bs_loband=bse_prepjob.output["bs_loband"], + nband=bse_prepjob.output["nband"], + mdf_epsinf=self.mdf_epsinf, + bs_freq_mesh=bse_prepjob.output["bs_freq_mesh"] + ) + jobs=[nscf_job, bse_prepjob, bse_job] + + return Flow(jobs, output=bse_job.output, name=self.name) + + @job(name="Find BSE parameters") + def find_bse_params(self, bandlims, enwinbse, directgap): + vband=[] + cband=[] + for bandlim in bandlims: + spin=bandlim[0] + iband=bandlim[1]+1 + enemin=bandlim[2] + enemax=bandlim[3] + if enemin>0: + if enemin= nband in screening and sigma + # pass + + def make( + self, + structure: Structure, + restart_from: Optional[Union[str, Path]] = None, + ): + + static_job = self.scf_maker.make( + structure, + restart_from=restart_from + ) + convergence = ConvergenceMaker( + maker=self.bse_maker, + epsilon=self.epsilon, + criterion_name=self.criterion_name, + convergence_field=self.convergence_field, + convergence_steps=self.convergence_steps, + ) + + bse = convergence.make(structure, prev_outputs=[static_job.output.dir_name]) + + return Flow([static_job, bse], bse.output, name=self.name) + +@dataclass +class BSEMultiShiftedMaker(Maker): + """ + Maker to generate convergence of G0W0 calculations. + + Parameters + ---------- + name : str + A name for the job + scf_maker : .BaseAbinitMaker + The maker to use for the scf calculation. + bse_maker : .BaseAbinitMaker + The maker to use for the bse calculations. + shiftks : list[tuple] + k-grid shifts to be used for multiple BSE calculations. + The resulting absorption spectra will be avaeraged. + """ + + name: str = "BSE Mutiple Shifted Grid" + scf_maker: BaseAbinitMaker = field(default_factory=StaticMaker) + bse_maker: BaseAbinitMaker = field(default_factory=BSEFlowMaker) + shiftks: list = None + + def make( + self, + structure: Structure, + restart_from: Optional[Union[str, Path]] = None, + ): + + jobs=[] + spectra=[] + static_job = self.scf_maker.make( + structure, + restart_from=restart_from + ) + jobs.append(static_job) + for idx, shifts in enumerate(self.shiftks): + bse_job = self.bse_maker.make( + structure=structure, + prev_outputs=[static_job.output.dir_name], + ) + bse_job = update_user_abinit_settings( + flow=bse_job, + abinit_updates={ + "shiftk": shifts} + ) + bse_job.append_name(append_str=f" {idx}") + jobs.append(bse_job) + spectra.append( + bse_job.output.output.emacro, + ) + avg_job=self.calc_average_spectra(spectra) + jobs.append(avg_job) + return Flow(jobs, output=avg_job.output, name=self.name) + + @job(name="Calculate average spectra") + def calc_average_spectra(self, spectra): + for idx, spectrum in enumerate(spectra): + if idx==0: + mesh0=spectrum[0] + teps2=spectrum[1] + else: + mesh=spectrum[0] + int_eps2=np.interp(mesh0, mesh, spectrum[1]) + teps2=np.add(teps2, int_eps2) + teps2=np.array(teps2)*(1./len(spectra)) + conv_res=[mesh0, teps2] + return Response(output=conv_res) + + diff --git a/src/atomate2/abinit/flows/gw.py b/src/atomate2/abinit/flows/gw.py new file mode 100644 index 0000000000..5473a94c9c --- /dev/null +++ b/src/atomate2/abinit/flows/gw.py @@ -0,0 +1,215 @@ +"""Core abinit flow makers.""" + +import numpy as np +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional, Union + +from jobflow import Flow, Maker, Response, job +from pymatgen.core.structure import Structure + +from atomate2.abinit.jobs.base import BaseAbinitMaker +from atomate2.abinit.jobs.core import NonSCFMaker, StaticMaker, ConvergenceMaker +from atomate2.abinit.jobs.gw import ScreeningMaker, SigmaMaker +from atomate2.abinit.powerups import update_user_abinit_settings + + +@dataclass +class G0W0Maker(Maker): + """ + Maker to perform G0W0 calculation from previous GWbands calculation. + + This is a screening calculation followed by a sigma calculations, + one can perform QP corrections only for bandedges (useful for + convergence calculations) or at all k-points. + + Parameters + ---------- + name : str + Name of the flows produced by this maker. + gw_qprange: int + 0 - Compute the QP corrections only for the fundamental and the direct gap + + +num - Compute the QP corrections for all the k-points in the irreducible zone + , and include num bands above and below the Fermi level. + + -num - Compute the QP corrections for all the k-points in the irreducible zone. + Include all occupied states and num empty states. + + joblist : list[str] + Steps of GW calculations to be included. + Default is ["scf", "nscf", "scr", "sigma"], + which creates a worflow to perform the + entire GW calculations. + scf_maker : .BaseAbinitMaker + The maker to use for the scf calculation. + nscf_maker : .BaseAbinitMaker + The maker to use for the nscf calculations. + scr_maker : .BaseAbinitMaker + The maker to use for the screening calculation. + sigma_maker : .BaseAbinitMaker + The maker to use for the sigma calculations. + """ + + name: str = "G0W0 calculation" + gw_qprange: int = 0 + joblist: List = field(default_factory=lambda: ["scf", "nscf", "scr", "sigma"]) + scf_maker: BaseAbinitMaker = field(default_factory=StaticMaker) + nscf_maker: BaseAbinitMaker = field(default_factory=NonSCFMaker) + scr_maker: BaseAbinitMaker = field(default_factory=ScreeningMaker) + sigma_maker: BaseAbinitMaker = field(default_factory=SigmaMaker) + + def make( + self, + structure: Structure, + prev_outputs: Optional[list] = [], + restart_from: Optional[Union[str, Path]] = None, + ): + """ + Create a G0W0 flow. + + Parameters + ---------- + structure : Structure + A pymatgen structure object. + prev_outputs : str + List of previous directory where scf, ncsf and scr calculations were done. + restart_from : str or Path or None + One previous directory to restart from. + + Returns + ------- + Flow + A G0W0 flow. + """ + joblist=self.joblist + jobs=[] + result={} + #SCF step + scf_job = self.scf_maker.make( + structure, + restart_from=restart_from + ) + if "scf" in joblist: + if len(prev_outputs)!=0: + raise RuntimeError("No previous calculation needed in prev_outputs") + jobs.append(scf_job) + prev_outputs=[scf_job.output.dir_name] + result=scf_job.output + + #NSCF step + if "nscf" in joblist: + if len(prev_outputs)!=1: + raise RuntimeError("Previous SCF calculation needed in prev_outputs") + nscf_job = self.nscf_maker.make( + prev_outputs=[prev_outputs[0]], + mode="uniform", + ) + jobs.append(nscf_job) + prev_outputs=[nscf_job.output.dir_name] + result=nscf_job.output + + #SCR step + if "scr" in joblist: + if len(prev_outputs)!=1: + raise RuntimeError("Previous NSCF calculation needed in prev_outputs") + scr_job = self.scr_maker.make( + prev_outputs=[prev_outputs[0]], + ) + scr_job = update_user_abinit_settings( + flow=scr_job, + abinit_updates={"iomode": 3} + ) + jobs.append(scr_job) + prev_outputs.append(scr_job.output.dir_name) + result=scr_job.output + + #SIGMA step + if "sigma" in joblist: + if len(prev_outputs)!=2: + raise RuntimeError("Previous NSCF and SCR calculation needed in prev_outputs") + sigma_job = self.sigma_maker.make( + prev_outputs=[prev_outputs[0], prev_outputs[1]], + ) + sigma_job = update_user_abinit_settings( + flow=sigma_job, + abinit_updates={"gw_qprange": self.gw_qprange, "iomode": 3} + ) + jobs.append(sigma_job) + result=sigma_job.output + + return Flow(jobs, output=result, name=self.name) + +@dataclass +class G0W0ConvergenceMaker(Maker): + """ + Maker to generate convergence of G0W0 calculations. + + Parameters + ---------- + name : str + A name for the job + criterion_name: str + A name for the convergence criterion. Must be in the run results + epsilon: float + A difference in criterion value for subsequent runs + convergence_field: str + An input parameter that changes to achieve convergence + convergence_steps: list | tuple + An iterable of the possible values for the convergence field. + If the iterable is depleted and the convergence is not reached, + that the job is failed + """ + + name: str = "GW convergence" + criterion_name: str = "bandgap" + epsilon: float = 0.1 + convergence_field: str = field(default_factory=str) + convergence_steps: list = field(default_factory=list) + + def make( + self, + structure: Structure, + restart_from: Optional[Union[str, Path]] = None, + ): + + NSCF_FIELDS = ["nband","ngkpt"] + SCR_FIELDS = ["ecuteps"] + SIGMA_FIELDS = ["ecutsigx"] + + SUPPORTED_FIELDS = NSCF_FIELDS + SCR_FIELDS + SIGMA_FIELDS + + if self.convergence_field not in SUPPORTED_FIELDS: + raise RuntimeError("convergence_field not supported yet") + + if self.convergence_field in NSCF_FIELDS: + static_job = G0W0Maker(joblist=["scf"]).make(structure) + gw_maker = G0W0Maker(joblist=["nscf","scr","sigma"]) + flow=[static_job] + prev_outputs=[static_job.output.dir_name] + + if self.convergence_field in SCR_FIELDS: + static_job = G0W0Maker(joblist=["scf","nscf"]).make(structure) + gw_maker = G0W0Maker(joblist=["scr","sigma"]) + flow=[static_job] + prev_outputs=[static_job.output.dir_name] + + if self.convergence_field in SIGMA_FIELDS: + pre_static_job = G0W0Maker(joblist=["scf","nscf"]).make(structure) + static_job = G0W0Maker(joblist=["scr"]).make(structure, prev_outputs=[pre_static_job.output.dir_name]) + gw_maker = G0W0Maker(joblist=["sigma"]) + flow=[pre_static_job,static_job] + prev_outputs=[pre_static_job.output.dir_name, static_job.output.dir_name] + + convergence = ConvergenceMaker( + maker = gw_maker, + epsilon = self.epsilon, + criterion_name = self.criterion_name, + convergence_field = self.convergence_field, + convergence_steps = self.convergence_steps, + ) + + gw_job = convergence.make(structure, prev_outputs=prev_outputs) + flow.append(gw_job) + return Flow(flow, gw_job.output, name=self.name) + diff --git a/src/atomate2/abinit/jobs/bse.py b/src/atomate2/abinit/jobs/bse.py new file mode 100644 index 0000000000..94121e1449 --- /dev/null +++ b/src/atomate2/abinit/jobs/bse.py @@ -0,0 +1,119 @@ +"""Core jobs for running ABINIT calculations.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +import numpy as np +from jobflow import Response, job + +from atomate2.abinit.jobs.base import BaseAbinitMaker +from atomate2.abinit.sets.bse import BSEmdfSetGenerator, BSEscrSetGenerator + +logger = logging.getLogger(__name__) + +__all__ = ["BSEmdfMaker", "BSEscrMaker"] + + +@dataclass +class BSEscrMaker(BaseAbinitMaker): + """Maker to create BSE with full dielectric function calculations.""" + + calc_type: str = "bse_scr" + name: str = "BSE scr calculation" + + input_set_generator: BSEscrSetGenerator = field( + default_factory=BSEscrSetGenerator + ) + + @job + def make( + self, + structure: Structure | None = None, + prev_outputs: str | list[str] | None = None, + mdf_epsinf: float | None = None, + mbpt_sciss: float = 0.0, + bs_loband: float = 0.0, + nband: float = 0.0, + bs_freq_mesh: list[float] | None = None, + ) -> Job: + """ + Run a non-scf ABINIT job. + + Parameters + ---------- + structure : .Structure + A pymatgen structure object. + mode : str + Type of band structure calculation. Options are: + - "line": Full band structure along symmetry lines. + - "uniform": Uniform mesh band structure. + """ + + if len(prev_outputs)!=2: + raise RuntimeError("Need previous SCF and SCREENING calculations") + + self.input_set_generator.factory_kwargs = {"mbpt_sciss": mbpt_sciss, + "bs_loband": bs_loband, + "nband": nband, + "mdf_epsinf": mdf_epsinf, + "bs_freq_mesh": bs_freq_mesh + } + + return super().make.original( + self, + structure=structure, + prev_outputs=prev_outputs, + ) + +@dataclass +class BSEmdfMaker(BaseAbinitMaker): + """Maker to create BSE with model dielectric function calculations.""" + + calc_type: str = "bse_mdf" + name: str = "BSE mdf calculation" + + input_set_generator: BSEmdfSetGenerator = field( + default_factory=BSEmdfSetGenerator + ) + + @job + def make( + self, + structure: Structure | None = None, + prev_outputs: str | list[str] | None = None, + mdf_epsinf: float | None = None, + mbpt_sciss: float = 0.0, + bs_loband: float = 0.0, + nband: float = 0.0, + bs_freq_mesh: list[float] | None = None, + ) -> Job: + """ + Run a non-scf ABINIT job. + + Parameters + ---------- + structure : .Structure + A pymatgen structure object. + mode : str + Type of band structure calculation. Options are: + - "line": Full band structure along symmetry lines. + - "uniform": Uniform mesh band structure. + """ + if mdf_epsinf==None: + raise RuntimeError("Need a value of mdf_epsinf") + if len(prev_outputs)!=1: + raise RuntimeError("Need only one previous SCF calculation") + + self.input_set_generator.factory_kwargs = {"mbpt_sciss": mbpt_sciss, + "bs_loband": bs_loband, + "nband": nband, + "mdf_epsinf": mdf_epsinf, + "bs_freq_mesh": bs_freq_mesh + } + + return super().make.original( + self, + structure=structure, + prev_outputs=prev_outputs, + ) diff --git a/src/atomate2/abinit/jobs/core.py b/src/atomate2/abinit/jobs/core.py index 2e1e73d9d6..ffe1a1b72b 100644 --- a/src/atomate2/abinit/jobs/core.py +++ b/src/atomate2/abinit/jobs/core.py @@ -3,6 +3,8 @@ from __future__ import annotations import logging +import json +from pathlib import Path from dataclasses import dataclass, field from typing import TYPE_CHECKING, ClassVar @@ -12,9 +14,13 @@ RelaxConvergenceWarning, ScfConvergenceWarning, ) -from jobflow import Job, job +from jobflow import Job, job, Maker, Response, Flow from atomate2.abinit.jobs.base import BaseAbinitMaker +from atomate2.abinit.powerups import update_user_abinit_settings, update_user_kpoints_settings +from atomate2.abinit.utils.common import check_convergence +from pymatgen.io.abinit.abiobjects import KSampling +from atomate2.abinit.schemas.task import AbinitTaskDoc, ConvergenceSummary from atomate2.abinit.sets.core import ( LineNonSCFSetGenerator, NonSCFSetGenerator, @@ -33,6 +39,10 @@ from atomate2.abinit.utils.history import JobHistory logger = logging.getLogger(__name__) +CONVERGENCE_FILE_NAME = "convergence.json" + +__all__ = ["StaticMaker", "NonSCFMaker", "RelaxMaker", "ConvergenceMaker"] + @dataclass @@ -194,3 +204,186 @@ def full_relaxation(cls, *args, **kwargs) -> Job: return cls( input_set_generator=RelaxSetGenerator(*args, relax_cell=True, **kwargs) ) +@dataclass +class ConvergenceMaker(Maker): + """A job that performs convergence run for a given number of steps. Stops either + when all steps are done, or when the convergence criterion is reached, that is when + the absolute difference between the subsequent values of the convergence field is + less than a given epsilon. + + Parameters + ---------- + name : str + A name for the job + maker: .BaseAbinitMaker + A maker for the run + criterion_name: str + A name for the convergence criterion. Must be in the run results + epsilon: float + A difference in criterion value for subsequent runs + convergence_field: str + An input parameter that changes to achieve convergence + convergence_steps: list | tuple + An iterable of the possible values for the convergence field. + If the iterable is depleted and the convergence is not reached, + that the job is failed + """ + + name: str = "Convergence job" + maker: BaseAbinitMaker = field(default_factory=BaseAbinitMaker) + criterion_name: str = "energy_per_atom" + epsilon: float = 0.001 + convergence_field: str = field(default_factory=str) + convergence_steps: list = field(default_factory=list) + + def __post_init__(self): + self.last_idx = len(self.convergence_steps) + + def make( + self, + structure: Structure, + prev_outputs: str | list[str] | Path = None + ): + """A top-level flow controlling convergence iteration + + Parameters + ---------- + atoms : MSONableAtoms + a structure to run a job + """ + convergence_job = self.convergence_iteration(structure, prev_outputs=prev_outputs) + return Flow([convergence_job], output=convergence_job.output) + + @job + def convergence_iteration( + self, + structure: Structure, + prev_dir: str | Path = None, + prev_outputs: str | list[str] | Path = None, + ) -> Response: + """ + Runs several jobs with changing inputs consecutively to investigate + convergence in the results + + Parameters + ---------- + structure : Structure + The structure to run the job for + prev_dir: str | None + An Abinit calculation directory in which previous run contents are stored + + Returns + ------- + The output response for the job + """ + + # getting the calculation index + idx = 0 + converged = False + num_prev_outputs=len(prev_outputs) + if prev_dir is not None: + prev_dir = prev_dir.split(":")[-1] + convergence_file = Path(prev_dir) / CONVERGENCE_FILE_NAME + idx += 1 + if convergence_file.exists(): + with open(convergence_file) as f: + data = json.load(f) + idx = data["idx"] + 1 + # check for convergence + converged = data["converged"] + + if idx < self.last_idx and not converged: + # finding next jobs + if self.convergence_field=="kppa": + next_base_job = self.maker.make( + structure, + prev_outputs=prev_outputs, + kppa=self.convergence_steps[idx]) + print(idx,self.convergence_steps[idx]) + else: + base_job = self.maker.make( + structure, + prev_outputs=prev_outputs) + next_base_job = update_user_abinit_settings( + flow=base_job, + abinit_updates={ + self.convergence_field: self.convergence_steps[idx]}) + next_base_job.append_name(append_str=f" {idx}") + update_file_job = self.update_convergence_file( + prev_dir=prev_dir, + job_dir=next_base_job.output.dir_name, + output=next_base_job.output) + prev_outputs=prev_outputs[:num_prev_outputs] + next_job = self.convergence_iteration( + structure, + prev_dir=next_base_job.output.dir_name, + prev_outputs=prev_outputs) + replace_flow = Flow( + [next_base_job, update_file_job, next_job], output=next_base_job.output) + return Response(detour=replace_flow, output=replace_flow.output) + else: + task_doc = AbinitTaskDoc.from_directory(prev_dir) + summary = ConvergenceSummary.from_abinit_calc_doc(task_doc) + return summary + + @job(name="Writing a convergence file") + def update_convergence_file( + self, prev_dir: str | Path, job_dir: str | Path, output + ): + """Write a convergence file + + Parameters + ---------- + TO DO: fill out + """ + idx = 0 + if prev_dir is not None: + prev_dir = prev_dir.split(":")[-1] + convergence_file = Path(prev_dir) / CONVERGENCE_FILE_NAME + if convergence_file.exists(): + with open(convergence_file) as f: + convergence_data = json.load(f) + idx = convergence_data["idx"] + 1 + else: + idx = 0 + convergence_data = { + "criterion_name": self.criterion_name, + "asked_epsilon": self.epsilon, + "criterion_values": [], + "convergence_field_name": self.convergence_field, + "convergence_field_values": [], + "converged": False, + } + convergence_data["convergence_field_values"].append(self.convergence_steps[idx]) + convergence_data["criterion_values"].append( + getattr(output.output, self.criterion_name) + ) + convergence_data["idx"] = idx + if len(convergence_data["criterion_values"]) > 1: + # checking for convergence + conv = check_convergence(convergence_data["criterion_values"][-1], convergence_data["criterion_values"][-2]) + convergence_data["converged"] = bool(conv < self.epsilon) + + job_dir = job_dir.split(":")[-1] + convergence_file = Path(job_dir) / CONVERGENCE_FILE_NAME + with open(convergence_file, "w") as f: + json.dump(convergence_data, f) + + @job(name="Getting the results") + def get_results(self, prev_dir: Path | str) -> Dict[str, Any]: + """Get the results for a calculation from a given directory + + Parameters + ---------- + prev_dir: Path or str + The calculation directory to get the results for + + Results + ------- + The results dictionary loaded from the JSON file + """ + convergence_file = Path(prev_dir) / CONVERGENCE_FILE_NAME + with open(convergence_file) as f: + return json.load(f) + + diff --git a/src/atomate2/abinit/jobs/gw.py b/src/atomate2/abinit/jobs/gw.py new file mode 100644 index 0000000000..c92b2b4cda --- /dev/null +++ b/src/atomate2/abinit/jobs/gw.py @@ -0,0 +1,51 @@ +"""Core jobs for running ABINIT calculations.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +import numpy as np +from jobflow import Response, job + +from atomate2.abinit.jobs.base import BaseAbinitMaker +from atomate2.abinit.sets.gw import ScreeningSetGenerator, SigmaSetGenerator + +logger = logging.getLogger(__name__) + +__all__ = ["ScreeningMaker", "SigmaMaker"] + + +@dataclass +class ScreeningMaker(BaseAbinitMaker): + """Maker to create ABINIT scf jobs. + + Parameters + ---------- + name : str + The job name. + """ + + calc_type: str = "scr" + name: str = "Screening calculation" + + input_set_generator: ScreeningSetGenerator = field( + default_factory=ScreeningSetGenerator + ) + + # CRITICAL_EVENTS: ClassVar[Sequence[str]] = ("ScfConvergenceWarning",) + + +@dataclass +class SigmaMaker(BaseAbinitMaker): + """Maker to create non SCF calculations.""" + + calc_type: str = "sigma" + name: str = "Sigma calculation" + + input_set_generator: SigmaSetGenerator = field( + default_factory=SigmaSetGenerator + ) + + # CRITICAL_EVENTS: ClassVar[Sequence[str]] = ("ScfConvergenceWarning",) + + diff --git a/src/atomate2/abinit/schemas/calculation.py b/src/atomate2/abinit/schemas/calculation.py index 64b694ef5b..9a21af8276 100644 --- a/src/atomate2/abinit/schemas/calculation.py +++ b/src/atomate2/abinit/schemas/calculation.py @@ -4,6 +4,8 @@ import logging import os +import pandas as pd +import numpy as np from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Optional, Union @@ -12,6 +14,9 @@ pass from abipy.electrons.gsr import GsrFile +from abipy.electrons.scr import ScrFile +from abipy.electrons.gw import SigresFile +from abipy.electrons.bse import MdfFile from abipy.flowtk import events from abipy.flowtk.utils import File from emmet.core.math import Matrix3D, Vector3D @@ -67,6 +72,8 @@ class CalculationOutput(BaseModel): The conduction band minimum in eV (if system is not metallic vbm: float The valence band maximum in eV (if system is not metallic) + emacro: abipy Function1D object + Macroscopic dielectric function """ energy: float = Field( @@ -92,7 +99,7 @@ class CalculationOutput(BaseModel): bandgap: Optional[float] = Field( None, description="The band gap from the calculation in eV" ) - direct_bandgap: Optional[float] = Field( + direct_gap: Optional[float] = Field( None, description="The direct band gap from the calculation in eV" ) cbm: Optional[float] = Field( @@ -105,6 +112,21 @@ class CalculationOutput(BaseModel): description="The valence band maximum, or HOMO for molecules, in eV " "(if system is not metallic)", ) + emacro: Optional[list] = Field( + None, + description="Macroscopic dielectric function", + ) + mbpt_sciss: Optional[float] = Field( + None, + description="Scissor shift (scalar) obtained from GW calculation", + ) + bandlims: Optional[list] = Field( + None, + description="Minimum and Maximum energies of each bands", + ) + + class Config: + arbitrary_types_allowed = True @classmethod def from_abinit_gsr( @@ -127,24 +149,34 @@ def from_abinit_gsr( # In case no conduction bands were included try: + vbm = output.ebands.get_edge_state("vbm").eig cbm = output.ebands.get_edge_state("cbm").eig bandgap = output.ebands.fundamental_gaps[ 0 ].energy # [0] for one spin channel only - direct_bandgap = output.ebands.direct_gaps[0].energy + direct_gap = output.ebands.direct_gaps[0].energy except ValueError: cbm = None bandgap = None - direct_bandgap = None + direct_gap = None + + bandlims=[] + for spin in output.ebands.spins: + for iband in range(output.ebands.nband): + enemin=output.ebands.enemin(spin=spin,band=iband)-vbm + enemax=output.ebands.enemax(spin=spin,band=iband)-vbm + bandlims.append([spin, iband, enemin, enemax]) electronic_output = { "efermi": float(output.ebands.fermie), - "vbm": output.ebands.get_edge_state("vbm").eig, + "vbm": vbm, "cbm": cbm, "bandgap": bandgap, - "direct_bandgap": direct_bandgap, + "direct_gap": direct_gap, + "bandlims": bandlims, } + forces = None if output.cart_forces is not None: forces = output.cart_forces.tolist() @@ -161,6 +193,100 @@ def from_abinit_gsr( forces=forces, stress=stress, ) + @classmethod + def from_abinit_scr( + cls, + output: ScrFile, # Must use auto_load kwarg when passed + ) -> CalculationOutput: + """ + Create an Abinit output document from Abinit outputs. + + Parameters + ---------- + output: .AbinitOutput + An AbinitOutput object. + + Returns + ------- + The Abinit calculation output document. + """ + structure = output.structure # final structure by default for GSR + + return cls( + structure=structure, + energy=0.0, + energy_per_atom=0.0, + efermi=0.0, + ) + @classmethod + def from_abinit_sig( + cls, + output: SigresFile, # Must use auto_load kwarg when passed + ) -> CalculationOutput: + """ + Create an Abinit output document from Abinit outputs. + + Parameters + ---------- + output: .AbinitOutput + An AbinitOutput object. + + Returns + ------- + The Abinit calculation output document. + """ + structure = output.structure # final structure by default for GSR + + # In case no conduction bands were included + ivbm=output.ebands.get_edge_state("vbm") + icbm=output.ebands.get_edge_state("cbm") + vbm=output.get_qpcorr(ivbm.spin, ivbm.kpoint, ivbm.band).re_qpe + cbm=output.get_qpcorr(icbm.spin, icbm.kpoint, icbm.band).re_qpe + ks_gap=icbm.eig-ivbm.eig + bandgap=cbm-vbm + mbpt_sciss=bandgap-ks_gap + electronic_output = { + "efermi": float(output.ebands.fermie), + "vbm": vbm, + "cbm": cbm, + "bandgap": bandgap, + "mbpt_sciss": mbpt_sciss, + } + return cls( + structure=structure, + energy=0.0, + energy_per_atom=0.0, + **electronic_output, + ) + @classmethod + def from_abinit_mdf( + cls, + output: MdfFile, # Must use auto_load kwarg when passed + ) -> CalculationOutput: + """ + Create an Abinit output document from Abinit outputs. + + Parameters + ---------- + output: .AbinitOutput + An AbinitOutput object. + + Returns + ------- + The Abinit calculation output document. + """ + structure = output.structure # final structure by default for GSR + gw_eps=output.get_mdf(mdf_type="exc").emacro_avg + emacromesh=gw_eps.mesh + emacrovalues=gw_eps.values.imag + emacro=[emacromesh, emacrovalues] + return cls( + structure=structure, + energy=0.0, + energy_per_atom=0.0, + efermi=0.0, + emacro=emacro + ) class Calculation(BaseModel): @@ -208,9 +334,14 @@ def from_abinit_files( cls, dir_name: Path | str, task_name: str, + abinit_gsr_file: Path | str = "out_GSR.nc", + abinit_scr_file: Path | str = "out_SCR.nc", + abinit_sig_file: Path | str = "out_SIGRES.nc", + abinit_mdf_file: Path | str = "out_MDF.nc", abinit_log_file: Path | str = LOG_FILE_NAME, abinit_abort_file: Path | str = MPIABORTFILE, + ) -> tuple[Calculation, dict[AbinitObject, dict]]: """ Create an Abinit calculation document from a directory and file paths. @@ -235,14 +366,32 @@ def from_abinit_files( """ dir_name = Path(dir_name) abinit_gsr_file = dir_name / abinit_gsr_file + abinit_scr_file = dir_name / abinit_scr_file + abinit_sig_file = dir_name / abinit_sig_file + abinit_mdf_file = dir_name / abinit_mdf_file abinit_log_file = dir_name / abinit_log_file abinit_abort_file = dir_name / abinit_abort_file - - abinit_gsr = GsrFile.from_file(abinit_gsr_file) + if os.path.isfile(abinit_gsr_file): + abinit_gsr = GsrFile.from_file(abinit_gsr_file) + output_doc = CalculationOutput.from_abinit_gsr(abinit_gsr) + abinit_version = abinit_gsr.abinit_version + elif os.path.isfile(abinit_scr_file): + abinit_scr = ScrFile.from_file(abinit_scr_file) + output_doc = CalculationOutput.from_abinit_scr(abinit_scr) + abinit_version = abinit_scr.abinit_version + elif os.path.isfile(abinit_sig_file): + abinit_sig = SigresFile.from_file(abinit_sig_file) + output_doc = CalculationOutput.from_abinit_sig(abinit_sig) + abinit_version = abinit_sig.abinit_version + elif os.path.isfile(abinit_mdf_file): + abinit_mdf = MdfFile.from_file(abinit_mdf_file) + output_doc = CalculationOutput.from_abinit_mdf(abinit_mdf) + abinit_version = abinit_mdf.abinit_version + else: + print("No ouput file found.") completed_at = str(datetime.fromtimestamp(os.stat(abinit_log_file).st_mtime)) - output_doc = CalculationOutput.from_abinit_gsr(abinit_gsr) report = None has_abinit_completed = TaskState.FAILED @@ -267,7 +416,7 @@ def from_abinit_files( cls( dir_name=str(dir_name), task_name=task_name, - abinit_version=abinit_gsr.abinit_version, + abinit_version=abinit_version, has_abinit_completed=has_abinit_completed, completed_at=completed_at, output=output_doc, diff --git a/src/atomate2/abinit/schemas/task.py b/src/atomate2/abinit/schemas/task.py index 5a24b97c1d..90de4dbea1 100644 --- a/src/atomate2/abinit/schemas/task.py +++ b/src/atomate2/abinit/schemas/task.py @@ -2,12 +2,14 @@ from __future__ import annotations +import json import logging from collections.abc import Sequence from pathlib import Path from typing import Any, Optional, TypeVar, Union from abipy.abio.inputs import AbinitInput +from abipy.core.func1d import Function1D from abipy.flowtk import events from emmet.core.math import Matrix3D, Vector3D from emmet.core.structure import StructureMetadata @@ -16,7 +18,7 @@ from atomate2.abinit.files import load_abinit_input from atomate2.abinit.schemas.calculation import AbinitObject, Calculation, TaskState -from atomate2.abinit.utils.common import LOG_FILE_NAME, MPIABORTFILE +from atomate2.abinit.utils.common import LOG_FILE_NAME, MPIABORTFILE, check_convergence from atomate2.utils.datetime import datetime_str from atomate2.utils.path import get_uri, strip_hostname @@ -101,6 +103,9 @@ class OutputDoc(BaseModel): bandgap: Optional[float] = Field( None, description="The DFT bandgap for the last calculation" ) + direct_gap: Optional[float] = Field( + None, description="The direct DFT bandgap for the last calculation" + ) cbm: Optional[float] = Field(None, description="CBM for this calculation") vbm: Optional[float] = Field(None, description="VBM for this calculation") forces: Optional[list[Vector3D]] = Field( @@ -109,6 +114,17 @@ class OutputDoc(BaseModel): stress: Optional[Matrix3D] = Field( None, description="Stress on the unit cell from the last calculation" ) + emacro: Optional[list] = Field( + None, description="Macroscopic dielectric function" + ) + mbpt_sciss: Optional[float] = Field( + None, description="Scissor shift (scalar) from GW calculation" + ) + bandlims: Optional[list] = Field( + None, description="Minimum and Maximum energies of each bands" + ) + class Config: + arbitrary_types_allowed = True @classmethod def from_abinit_calc_doc(cls, calc_doc: Calculation) -> OutputDoc: @@ -129,10 +145,14 @@ def from_abinit_calc_doc(cls, calc_doc: Calculation) -> OutputDoc: energy=calc_doc.output.energy, energy_per_atom=calc_doc.output.energy_per_atom, bandgap=calc_doc.output.bandgap, + direct_gap=calc_doc.output.direct_gap, cbm=calc_doc.output.cbm, vbm=calc_doc.output.vbm, forces=calc_doc.output.forces, stress=calc_doc.output.stress, + emacro=calc_doc.output.emacro, + mbpt_sciss=calc_doc.output.mbpt_sciss, + bandlims=calc_doc.output.bandlims, ) @@ -234,7 +254,6 @@ class AbinitTaskDoc(StructureMetadata): additional_json: Optional[dict[str, Any]] = Field( None, description="Additional json loaded from the calculation directory" ) - @classmethod def from_directory( cls: type[_T], @@ -364,7 +383,12 @@ def _get_task_files(files: list[Path], suffix: str = "") -> dict: abinit_files["abinit_log_file"] = Path(file).relative_to(path) elif file.match(f"*{MPIABORTFILE}{suffix}*"): abinit_files["abinit_abort_file"] = Path(file).relative_to(path) - + if file.match(f"*outdata/out_SCR{suffix}*"): + abinit_files["abinit_scr_file"] = Path(file).relative_to(path) + if file.match(f"*outdata/out_SIGRES{suffix}*"): + abinit_files["abinit_sig_file"] = Path(file).relative_to(path) + if file.match(f"*outdata/out_MDF{suffix}*"): + abinit_files["abinit_mdf_file"] = Path(file).relative_to(path) return abinit_files for task_name in task_names: @@ -388,3 +412,71 @@ def _get_task_files(files: list[Path], suffix: str = "") -> dict: task_files["standard"] = standard_files return task_files + +class ConvergenceSummary(BaseModel): + """Summary of the outputs for an Abinit convergence calculation.""" + + structure: Structure = Field(None, description="The output structure object") + converged: bool = Field(None, description="Is convergence achieved?") + + convergence_criterion_name: str = Field( + None, description="The output name of the convergence criterion" + ) + convergence_field_name: str = Field( + None, description="The name of the input setting to study convergence against" + ) + convergence_criterion_value: Union[float, list] = Field( + None, description="The output value of the convergence criterion" + ) + convergence_field_value: Any = Field( + None, + description="The last value of the input setting to study convergence against", + ) + asked_epsilon: float = Field( + None, + description="The difference in the values for the convergence criteria that was asked for", + ) + actual_epsilon: float = Field( + None, description="The actual difference in the convergence criteria values" + ) + + @classmethod + def from_abinit_calc_doc(cls, calc_doc: Calculation) -> "ConvergenceSummary": + """ + Create a summary of Abinit calculation outputs from an Abinit calculation document. + + Parameters + ---------- + calc_doc + An Abinit calculation document. + + Returns + ------- + :ConvergenceSummary + The summary for convergence runs. + """ + + from atomate2.abinit.jobs.core import CONVERGENCE_FILE_NAME + + job_dir = calc_doc.dir_name.split(":")[-1] + + convergence_file = Path(job_dir) / CONVERGENCE_FILE_NAME + if not convergence_file.exists(): + raise ValueError( + f"Did not find the convergence json file {CONVERGENCE_FILE_NAME} in {calc_doc.dir_name}" + ) + + with open(convergence_file) as f: + convergence_data = json.load(f) + actual_epsilon=check_convergence(convergence_data["criterion_values"][-1], convergence_data["criterion_values"][-2]) + + return cls( + structure=calc_doc.output.structure, + converged=convergence_data["converged"], + convergence_criterion_name=convergence_data["criterion_name"], + convergence_field_name=convergence_data["convergence_field_name"], + convergence_criterion_value=convergence_data["criterion_values"][-1], + convergence_field_value=convergence_data["convergence_field_values"][-1], + asked_epsilon=convergence_data["asked_epsilon"], + actual_epsilon=actual_epsilon, + ) diff --git a/src/atomate2/abinit/sets/bse.py b/src/atomate2/abinit/sets/bse.py new file mode 100644 index 0000000000..faf99aa37b --- /dev/null +++ b/src/atomate2/abinit/sets/bse.py @@ -0,0 +1,118 @@ +"""Module defining Abinit input set generators specific to GW calculations.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Callable + +from atomate2.abinit.sets.factories import bse_with_mdf_from_inputs +from abipy.abio.input_tags import SCF, NSCF, SCREENING, SIGMA + +from atomate2.abinit.files import load_abinit_input +from atomate2.abinit.sets.base import AbinitInputGenerator +from atomate2.abinit.sets.core import NonSCFSetGenerator +from pymatgen.io.abinit.abiobjects import KSampling + +if TYPE_CHECKING: + from abipy.abio.inputs import AbinitInput + from pymatgen.core import Structure + from pymatgen.io.abinit import PseudoTable + +__all__ = [ + "BSEmdfSetGenerator", + "BSEscrSetGenerator", +] + + +@dataclass +class BSEmdfSetGenerator(AbinitInputGenerator): + """Class to generate Abinit BSE with model dielectric function input sets.""" + + calc_type: str = "bse_mdf" + factory: Callable = bse_with_mdf_from_inputs + pseudos: str | list[str] | PseudoTable | None = None + prev_outputs_deps: tuple = (f"{NSCF}:WFK",) + factory_kwargs: dict = field(default_factory=dict) + factory_prev_inputs_kwargs: dict | None = field( + default_factory=lambda: {"nscf_input": (NSCF,),} + ) + def get_abinit_input( + self, + structure: Structure | None = None, + pseudos: PseudoTable | None = None, + prev_outputs: list[str] | None = None, + abinit_settings: dict | None = None, + factory_kwargs: dict | None = None, + kpoints_settings: dict | KSampling | None = None, + ) -> AbinitInput: + + """Get AbinitInput object for BSE calculation.""" + if prev_outputs is None: + raise RuntimeError("No previous_outputs. Cannot perform BSE calculation.") + if len(prev_outputs) != 1: + raise RuntimeError( + "Should have exactly one previous outputs for mdf-BSE (one NSCF calculation)." + ) + ab1 = load_abinit_input(prev_outputs[0]) + if NSCF not in ab1.runlevel: + raise RuntimeError("Could not find one NSCF calculation.") + + return super().get_abinit_input( + structure=structure, + pseudos=pseudos, + prev_outputs=prev_outputs, + factory_kwargs=factory_kwargs, + kpoints_settings=kpoints_settings, + ) + +@dataclass +class BSEscrSetGenerator(AbinitInputGenerator): + """Class to generate Abinit BSE with full dielectric function input sets.""" + + calc_type: str = "bse_full" + factory: Callable = bse_with_mdf_from_inputs + pseudos: str | list[str] | PseudoTable | None = None + prev_outputs_deps: tuple = (f"{NSCF}:WFK", f"{SCREENING}:SCR",) + factory_kwargs: dict = field(default_factory=dict) + factory_prev_inputs_kwargs: dict | None = field( + default_factory=lambda: {"nscf_input": (NSCF,), "scr_input": (SCREENING,),} + ) + def get_abinit_input( + self, + structure: Structure | None = None, + pseudos: PseudoTable | None = None, + prev_outputs: list[str] | None = None, + abinit_settings: dict | None = None, + factory_kwargs: dict | None = None, + kpoints_settings: dict | KSampling | None = None, + ) -> AbinitInput: + + """Get AbinitInput object for BSE calculation.""" + if prev_outputs is None: + raise RuntimeError("No previous_outputs. Cannot perform BSE calculation.") + if len(prev_outputs) != 2: + raise RuntimeError( + "Should have exactly two previous outputs (one NSCF calculation " + "and one SCREENING calculation)." + ) + ab1 = load_abinit_input(prev_outputs[0]) + ab2 = load_abinit_input(prev_outputs[1]) + if NSCF in ab1.runlevel and SCREENING in ab2.runlevel: + nscf_inp = ab1 + scr_inp = ab2 + elif SCREENING in ab1.runlevel and NSCF in ab2.runlevel: + nscf_inp = ab2 + scr_inp = ab1 + else: + raise RuntimeError("Could not find one NSCF and one SCREENING calculation.") + + if nscf_inp.vars["ngkpt"]!=scr_inp.vars["ngkpt"]: + raise RuntimeError("Screening calculation k-grid is not compatible") + + return super().get_abinit_input( + structure=structure, + pseudos=pseudos, + prev_outputs=prev_outputs, + factory_kwargs=factory_kwargs, + kpoints_settings=kpoints_settings, + ) diff --git a/src/atomate2/abinit/sets/core.py b/src/atomate2/abinit/sets/core.py index 184c6cc673..94b23079ce 100644 --- a/src/atomate2/abinit/sets/core.py +++ b/src/atomate2/abinit/sets/core.py @@ -77,7 +77,7 @@ class NonSCFSetGenerator(AbinitInputGenerator): factory: Callable = nscf_from_gsinput pseudos: str | list[str] | PseudoTable | None = None restart_from_deps: tuple = (f"{NSCF}:WFK",) - prev_outputs_deps: tuple = (f"{SCF}:DEN",) + prev_outputs_deps: tuple = (f"{SCF}:DEN", f"{SCF}:WFK",) nbands_factor: float = 1.2 factory_kwargs: dict = field(default_factory=dict) diff --git a/src/atomate2/abinit/sets/factories.py b/src/atomate2/abinit/sets/factories.py new file mode 100644 index 0000000000..6a6cfa0e7f --- /dev/null +++ b/src/atomate2/abinit/sets/factories.py @@ -0,0 +1,32 @@ +# coding: utf-8 +"""Factory functions for Abinit input files """ +"""This code is planned to be moved into abipy/abio/factories.py""" +from __future__ import annotations + +import numpy as np +import pymatgen.io.abinit.abiobjects as aobj +from abipy.abio.inputs import AbinitInput +import abipy.core.abinit_units as abu + +def bse_with_mdf_from_inputs(nscf_input, bs_loband, nband, + mbpt_sciss=0.0, mdf_epsinf=0.0, scr_input=None, exc_type="TDA", bs_algo="haydock", accuracy="normal", spin_mode="polarized", + zcut="0.1 eV", ecuteps=3.0, coulomb_mode="model_df", bs_freq_mesh=[0.0, 10, 0.01]) -> AbinitInput: + """Return a sigma input.""" + + bse_input = nscf_input.deepcopy() + bse_input.pop_irdvars() + exc_ham = aobj.ExcHamiltonian(bs_loband=bs_loband, nband=nband, mbpt_sciss=mbpt_sciss*abu.eV_Ha, coulomb_mode=coulomb_mode, ecuteps=ecuteps, + spin_mode=spin_mode, mdf_epsinf=mdf_epsinf, exc_type=exc_type, algo=bs_algo, + bs_freq_mesh=np.array(bs_freq_mesh)*abu.eV_Ha, with_lf=True, zcut=zcut) + + bse_input.set_vars(exc_ham.to_abivars()) + # TODO: Cannot use istwfk != 1. + if scr_input: + bse_input.set_vars(ecuteps=scr_input["ecuteps"]) + bse_input.set_vars(bs_coulomb_term=11) + bse_input.pop_vars(["mdf_epsinf"]) + bse_input.set_vars(istwfk="*1") + bse_input.set_vars(ecutwfn=nscf_input["ecut"]) + bse_input.set_vars(bs_haydock_niter=200) + bse_input.set_vars(nband=nband) + return bse_input diff --git a/src/atomate2/abinit/sets/gw.py b/src/atomate2/abinit/sets/gw.py new file mode 100644 index 0000000000..7431eee909 --- /dev/null +++ b/src/atomate2/abinit/sets/gw.py @@ -0,0 +1,140 @@ +"""Module defining Abinit input set generators specific to GW calculations.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Callable + +from abipy.abio.factories import scr_from_nscfinput, sigma_from_inputs, scf_input, nscf_from_gsinput +from abipy.abio.input_tags import SCF, NSCF, SCREENING, SIGMA + +from atomate2.abinit.files import load_abinit_input +from atomate2.abinit.sets.base import AbinitInputGenerator +from atomate2.abinit.sets.core import NonSCFSetGenerator +from pymatgen.io.abinit.abiobjects import KSampling + +if TYPE_CHECKING: + from abipy.abio.inputs import AbinitInput + from pymatgen.core import Structure + from pymatgen.io.abinit import PseudoTable + +__all__ = [ + "ScreeningSetGenerator", + "SigmaSetGenerator", +] + + +@dataclass +class ScreeningSetGenerator(AbinitInputGenerator): + """Class to generate Abinit Screening input sets.""" + + + calc_type: str = "scr" + factory: Callable = scr_from_nscfinput + pseudos: str | list[str] | PseudoTable | None = None + prev_outputs_deps: tuple = (f"{NSCF}:WFK",) + factory_kwargs: dict = field(default_factory=dict) + factory_prev_inputs_kwargs: dict | None = field( + default_factory=lambda: {"nscf_input": (NSCF,)} + ) + + + def get_abinit_input( + self, + structure: Structure | None = None, + pseudos: PseudoTable | None = None, + prev_outputs: list[str] | None = None, + factory_kwargs: dict | None = None, + kpoints_settings: dict | KSampling | None = None, + ) -> AbinitInput: + + + nscf_inp = load_abinit_input(prev_outputs[0]) + + if factory_kwargs: + factory_kwargs.update({"ecutwfn": nscf_inp["ecut"]}) + else: + factory_kwargs={"ecutwfn": nscf_inp["ecut"]} + + return super().get_abinit_input( + structure=structure, + pseudos=pseudos, + prev_outputs=prev_outputs, + factory_kwargs=factory_kwargs, + kpoints_settings=kpoints_settings, + ) + + +@dataclass +class SigmaSetGenerator(AbinitInputGenerator): + """Class to generate Abinit Sigma input sets.""" + + calc_type: str = "sigma" + factory: Callable = sigma_from_inputs + pseudos: str | list[str] | PseudoTable | None = None + prev_outputs_deps: tuple = (f"{NSCF}:WFK", f"{SCREENING}:SCR") + factory_kwargs: dict = field(default_factory=dict) + factory_prev_inputs_kwargs: dict | None = field( + default_factory=lambda: {"nscf_input": (NSCF,), "scr_input": (SCREENING,)} + ) + + def get_abinit_input( + self, + structure: Structure | None = None, + pseudos: PseudoTable | None = None, + prev_outputs: list[str] | None = None, + abinit_settings: dict | None = None, + factory_kwargs: dict | None = None, + kpoints_settings: dict | KSampling | None = None, + ) -> AbinitInput: + + """Get AbinitInput object for SCR calculation.""" + if prev_outputs is None: + raise RuntimeError("No previous_outputs. Cannot perform Sigma calculation.") + if len(prev_outputs) != 2: + raise RuntimeError( + "Should have exactly two previous outputs (one NSCF calculation " + "and one SCREENING calculation)." + ) + ab1 = load_abinit_input(prev_outputs[0]) + ab2 = load_abinit_input(prev_outputs[1]) + if NSCF in ab1.runlevel and SCREENING in ab2.runlevel: + nscf_inp = ab1 + scr_inp = ab2 + elif SCREENING in ab1.runlevel and NSCF in ab2.runlevel: + nscf_inp = ab2 + scr_inp = ab1 + else: + raise RuntimeError("Could not find one NSCF and one SCREENING calculation.") + # TODO: do we need to check that the structures are the same in nscf and + # screening ? + + #previous_structure = get_final_structure(prev_outputs[0]) + # TODO: the structure in the previous abinit input may be slightly different + # from the one in the previous output (if abinit symmetrizes the structure) + # Should we set the structure in the previous_abinit_input ? Or should we + # assume that abinit will make the same symmetrization ? + # Or should we always symmetrize the structure before ? + # Or should we always set tolsym to 1.0e-8 ? + #nscf_inp.set_structure(previous_structure) + #scr_inp.set_structure(previous_structure) + if structure is not None: + if structure != previous_structure: + raise RuntimeError( + "Structure is provided in non-SCF input set generator but " + "is not the same as the one from the previous (SCF) input set." + ) + + # Sigma. + if factory_kwargs: + factory_kwargs.update({"ecutsigx": scr_inp["ecut"]}) + else: + factory_kwargs={"ecutsigx": scr_inp["ecut"]} + return super().get_abinit_input( + structure=structure, + pseudos=pseudos, + prev_outputs=prev_outputs, + factory_kwargs=factory_kwargs, + kpoints_settings=kpoints_settings, + ) + diff --git a/src/atomate2/abinit/utils/common.py b/src/atomate2/abinit/utils/common.py index 3fd19cb55b..4c3cda47e9 100644 --- a/src/atomate2/abinit/utils/common.py +++ b/src/atomate2/abinit/utils/common.py @@ -5,6 +5,8 @@ import logging import os from typing import TYPE_CHECKING +import numpy as np +from scipy.integrate import simpson if TYPE_CHECKING: from pathlib import Path @@ -430,3 +432,21 @@ def get_event_report(ofile: File, mpiabort_file: File) -> EventReport | None: return parser.report_exception(ofile.path, exc) else: return report + + +def check_convergence(old, new)-> float: + if type(old) is list: + mesh_old = old[0] + mesh_new = new[0] + values_old = np.array(old[1]) + values_new = np.array(new[1]) + values_int = np.interp(mesh_old, mesh_new, values_new) + delta = abs(values_int-values_old) + delarea = simpson(delta) + area = simpson(values_old) + conv = delarea/area + elif type(old) is float: + conv = abs(new-old) + else: + raise RuntimeError("Output not supported for convergence check") + return conv diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_1_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_1_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..5b648c8094 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_1_6/outputs/transformations.json.gz differ diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_2_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_2_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..9645ef9ab9 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_2_6/outputs/transformations.json.gz differ diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_3_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_3_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..41f0001fc9 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_3_6/outputs/transformations.json.gz differ diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_4_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_4_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..de8d86ac20 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_4_6/outputs/transformations.json.gz differ diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_5_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_5_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..cfd5812ba5 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_5_6/outputs/transformations.json.gz differ diff --git a/tests/test_data/vasp/Si_elastic/elastic_relax_6_6/outputs/transformations.json.gz b/tests/test_data/vasp/Si_elastic/elastic_relax_6_6/outputs/transformations.json.gz new file mode 100644 index 0000000000..b4608bde14 Binary files /dev/null and b/tests/test_data/vasp/Si_elastic/elastic_relax_6_6/outputs/transformations.json.gz differ