Plugin system

Introduction

Assuming the following package structure:

your_package
├── pyproject.toml        # or setup.py
└── src
    └── your_module
        ├── __init__.py
        ├── your_plugins.py
        └── ...

where the ert plugins are defined in your_plugins.py, then discovery is done by registering your plugin via a setuptools entry point, with the namespace ert:

# setup.py
setup(
    ...
    entry_points={"ert": ["your_module_jobs = your_module.your_plugins"]},
    ...
)
# pyproject.toml
[project.entry-points.ert]
your_module_jobs = "your_module.your_plugins"

This entry point should point to the module where your ert plugin(s) exists. (Notice that the entry point expects a list, so you can register multiple modules).

Kinds of plugins

A plugin is created with the ert.plugin decorator and the function name describes what kind of plugin it is.

Forward models

To install forward model steps that you want to have available in ERT you can use installable_forward_model_steps.

from typing import Optional

import ert

class MyForwardModel(ert.ForwardModelStepPlugin):
    def __init__(self):
        super().__init__(
            name="MY_FORWARD_MODEL",
            command=["my_executable", "<parameter1>", "<parameter2>"],
        )

    def validate_pre_realization_run(
        self, fm_step_json: ert.ForwardModelStepJSON
    ) -> ert.ForwardModelStepJSON:
        if fm_step_json["argList"][0] not in ["start", "stop"]:
            raise ert.ForwardModelStepValidationError(
                "First argument to MY_FORWARD_MODEL must be either start or stop"
            )
        return fm_step_json

    def validate_pre_experiment(self, fm_step_json: ert.ForwardModelStepJSON) -> None:
        pass

    @staticmethod
    def documentation() -> Optional[ert.ForwardModelStepDocumentation]:
        return ert.ForwardModelStepDocumentation(
            category="utility.templating",
            source_package="my_plugin",
            source_function_name="MyForwardModel",
            description="my plugin description",
        )


@ert.plugin(name="my_plugin")
def installable_forward_model_steps() -> list[ert.ForwardModelStepPlugin]:
    return [MyForwardModel]

Notice that by using installable_forward_model_steps, validation can be added where the methods validate_pre_experiment or validate_pre_realization_run can throw ForwardModelStepValidationError to indicate that the configuration of the forward model step is invalid (which ert then handles gracefully and presents nicely to the user). If you want to show a warning in cases where the configuration cannot be validated pre-experiment, you can use the ForwardModelStepWarning.warn(...) method.

import ert

@ert.plugin(name="my_plugin")
def job_documentation(step_name: str):
    if step_name == "my_step":
         return {
             "description": "step description",
             "examples": "...",
             "category": "test.category.for.step",
         }

When creating documentation in ERT, forward model steps will be grouped by their main categories (ie. the category listed before the first dot).

Forward model configuration

Forward model steps can be configured through the plugin system by using the forward_model_configuration hook. For each forward model step name in the config dict, a set of configuration parameters specific to that forward model can be specified. These configurations will be injected as environment variables when starting the forward model, and will be isolated from other forward models.

import ert

@ert.plugin(name="my_plugin")
def forward_model_configuration():
     return {
         "forward_model_step_name": {
             "config_key" : <config value>
         },
     }
  • <config value> can be of any python type, but will be exported to env by calling str(<config value>).

  • config_key will always be exposed as upper case environment variables.

Workflow job

There are two ways to install workflow jobs in ERT. Depending on whether you already have a configuration file or need to include additional documentation, you can choose between the installable_workflow_jobs hook or the ertscript_workflow hook.

  1. Using the installable_workflow_jobs hook

The hook is specified as follows:

import ert

@ert.plugin(name="my_plugin")
def installable_workflow_jobs():
     return {
         "wf_job_name": "/path/to/workflow_job.config",
     }

The configuration file needed to use the installable_workflow_jobs hook must point to an executable and specify its arguments. The built-in internal CSV_EXPORT workflow job is shown as an example:

INTERNAL    True
SCRIPT      /path/to/csv_export.py
MIN_ARG     1
MAX_ARG     2
ARG_TYPE    0  STRING
ARG_TYPE    1  STRING

Implement the hook specification as follows to register the workflow job CSV_EXPORT:

import ert

@ert.plugin(name="ert")
def installable_workflow_jobs() -> Dict[str, str]:
     return {
         "CSV_EXPORT": "/path/to/csv_export"
     }
  1. Using the ertscript_workflow hook

The second approach does not require creating a workflow job configuration file up-front, and allows adding documentation.

@no_type_check
@hook_specification
def ertscript_workflow(config: WorkflowConfigs) -> None:
    """
    This hook allows the user to register a workflow with the config object. A workflow
    must add the class inheriting from ErtScript and an optional name.

    :param config: A handle to the main workflow config.
    """

Minimal example:

import ert

class MyJob(ert.ErtScript):
    def run(self):
        print("Hello World")

@ert.plugin(name="my_plugin")
 def ertscript_workflow(config):
     config.add_workflow(MyJob, "MY_JOB")

Full example:

import ert

class MyJob(ert.ErtScript):
    def run(self):
        print("Hello World")

@ert.plugin(name="my_plugin")
 def ertscript_workflow(config: ert.WorkflowConfigs):
     config.add_workflow(
         MyJob,
         "MY_JOB",
         parser=my_job_parser,
         description="My job description", # optional
         examples="example of use", # optional
     )

The configuration object and properties are as follows.

ert.plugins.hook_specifications.jobs.ertscript_workflow(config: WorkflowConfigs) None

This hook allows the user to register a workflow with the config object. A workflow must add the class inheriting from ErtScript and an optional name.

Parameters:

config – A handle to the main workflow config.

Ert - Ensemble Reservoir Tool - a package for reservoir modeling.

class ert.ErtScript

ErtScript is the abstract baseclass for workflow jobs and plugins. It provides access to the ert internals and lets jobs implement the “run” function which is called when a workflow is executed.

cleanup() None

Override to perform cleanup after a run.

abstractmethod run(*arg: Any, **kwarg: Any) Any

This method is implemented by the workflow runners and executed when the workflow job is called.

The parameters are gotten from the workflow file, e.g. a workflow file containing

EXPORT_MISFIT_DATA path/to/output.hdf

will put path/to/output.hdf in the first argument to run.

static validate(args: list[Any]) None

If the workflow has problems it can validate against the arguments on startup. If it raises ConfigValidationError this will be caught and presented to the user.

class ert.WorkflowConfigs

Top level workflow config object, holds all workflow configs.

__init__() None
classmethod __new__(*args, **kwargs)
add_workflow(ert_script: type[ErtScript], name: str = '', description: str = '', examples: str | None = None, parser: Callable[[], ArgumentParser] | None = None, category: str = 'other') None
Parameters:
  • category – dot separated string

  • parser – will extract information to use in documentation

  • examples – must be valid rst, will be added to documentation

  • description – must be valid rst, defaults to __doc__

  • ert_script – class which inherits from ErtScript

  • name – Optional name for workflow (default is name of class)

Logging configuration

The logging can be configured by plugins to add custom log handlers.

ert.plugins.hook_specifications.logging.add_log_handle_to_root() Handler

Create a log handle which will be added to the root logger in the main entry point.

Returns:

A log handle that will be added to the root logger

Minimal example to log to a new file:

import ert

 @ert.plugin(name="my_plugin")
 def add_log_handle_to_root():
     import logging
     fh = logging.FileHandler('spam.log')
     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
     fh.setFormatter(formatter)
     return fh

IP address configuration

The way Ert chooses IP addresses for communicating with cluster jobs can be configured by plugins.

ert.plugins.hook_specifications.net_utils.get_ip_address() str

Ert uses network communication over TCP/IP and needs to provide potential network clients with which IP address is to be used to contact the main Ert process. By default, Ert will check the operating system routing table to get the IP address in use for non-localhost connections.

On machines exposing several IP addresses, the correct IP is non-trivial to pick, and by specifying this hook in an installed plugin, any custom code can be injected in order to pick the correct IP.

Minimal example to override how Ert chooses the IP address:

import ert

 @ert.plugin(name="my_plugin")
 def get_ip_address():
     return "127.0.0.1"