storage: factor out atomic update operation (#10409)

The responsibility for atomic updates to a config file was previously
spread across different functions. This moves the atomic update to the
lowest level of the call graph.

Also, factor out the code that creates a ConfigObj based on various
inputs (archive dir, cert locations, and renewal params). This allows
cleanly reusing it across the "update" and "create new" paths.

Now the "create new config" code path doesn't have to do any renaming,
and the "update config" code path can assume the input file exists (and
error if it doesn't).

This will make it easier and cleaner to reuse the config-writing code in
#10377.

Part of #10355
This commit is contained in:
Jacob Hoffman-Andrews 2025-08-13 11:53:37 -07:00 committed by GitHub
parent 34a128ae88
commit bd796deaa3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 86 additions and 72 deletions

View file

@ -105,55 +105,59 @@ def add_time_interval(base_time: datetime.datetime, interval: str,
return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0]
def write_renewal_config(o_filename: str, n_filename: str, archive_dir: str,
target: Mapping[str, str],
relevant_data: Mapping[str, Any]) -> configobj.ConfigObj:
"""Writes a renewal config file with the specified name and values.
def create_renewal_config_file(filename: str, archive_dir: str,
target: Mapping[str, str],
cli_config: configuration.NamespaceConfig) -> None:
"""Creates a renewal config file with the specified name and values.
:param str o_filename: Absolute path to the previous version of config file
:param str n_filename: Absolute path to the new destination of config file
:param str filename: Absolute path to the new destination of config file
:param str archive_dir: Absolute path to the archive directory
:param dict target: Maps ALL_FOUR to their symlink paths
:param dict relevant_data: Renewal configuration options to save
:returns: Configuration object for the new config file
:rtype: configobj.ConfigObj
:param .NamespaceConfig cli_config: parsed command line
arguments
"""
config = configobj.ConfigObj(o_filename, encoding='utf-8', default_encoding='utf-8')
config["version"] = certbot.__version__
config["archive_dir"] = archive_dir
for kind in ALL_FOUR:
config[kind] = target[kind]
if "renewalparams" not in config:
config["renewalparams"] = {}
config.comments["renewalparams"] = ["",
"Options used in "
"the renewal process"]
config["renewalparams"].update(relevant_data)
for k in config["renewalparams"]:
if k not in relevant_data:
del config["renewalparams"][k]
# TODO: add human-readable comments explaining other available
# parameters
logger.debug("Writing new config %s.", n_filename)
# Ensure that the file exists
with open(n_filename, 'a'):
pass
# Copy permissions from the old version of the file, if it exists.
if os.path.exists(o_filename):
current_permissions = stat.S_IMODE(os.lstat(o_filename).st_mode)
filesystem.chmod(n_filename, current_permissions)
with open(n_filename, "wb") as f:
config = make_renewal_configobj(archive_dir, target, cli_config)
with open(filename, "wb") as f:
config.write(outfile=f)
return config
def atomic_rewrite(config_filename: str, new_config: configobj.ConfigObj) -> None:
"""Update a config file on disk with the provided values.
The update will be atomic: if writing fails, the original file will not be modified. The
updated file will preserve comments from the original and will have the same permissions.
In general, fields that exist on disk but not in new_config will be preserved. As a special
case, fields in the 'renewalparams' section will be deleted unless they exist in new_config.
This deletion clears out old fields from when we used to dump _all_ flags (as opposed to
relevant ones).
:param str config_filename: Absolute path to the configuration file
:param configobj.ConfigObj new_config: New configuration object
"""
# Read the existing values from `config_filename`, or error (file_error=True)
merged_config = configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8',
file_error=True)
merged_config.merge(new_config)
# We merge and then delete, rather than just replacing the 'renewalparams' section, so we can
# preserve comments from the on-disk config file.
for k in merged_config["renewalparams"]:
if k not in new_config["renewalparams"]:
del merged_config["renewalparams"][k]
current_permissions = stat.S_IMODE(os.lstat(config_filename).st_mode)
temp_filename = config_filename + ".new"
# If an existing tempfile exists, delete it
if os.path.exists(temp_filename):
os.unlink(temp_filename)
with util.safe_open(temp_filename, "wb", current_permissions) as f:
merged_config.write(outfile=f)
filesystem.replace(temp_filename, config_filename)
def rename_renewal_config(prev_name: str, new_name: str,
@ -182,28 +186,42 @@ def update_configuration(lineagename: str, archive_dir: str, target: Mapping[str
:param str lineagename: Name of the lineage being modified
:param str archive_dir: Absolute path to the archive directory
:param dict target: Maps ALL_FOUR to their symlink paths
:param .NamespaceConfig cli_config: parsed command line
arguments
:param .NamespaceConfig cli_config: parsed command line arguments
:returns: Configuration object for the updated config file
:rtype: configobj.ConfigObj
"""
config = make_renewal_configobj(archive_dir, target, cli_config)
config_filename = renewal_filename_for_lineagename(cli_config, lineagename)
temp_filename = config_filename + ".new"
# If an existing tempfile exists, delete it
if os.path.exists(temp_filename):
os.unlink(temp_filename)
# Save only the config items that are relevant to renewal
values = relevant_values(cli_config)
write_renewal_config(config_filename, temp_filename, archive_dir, target, values)
filesystem.replace(temp_filename, config_filename)
atomic_rewrite(config_filename, config)
return configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8')
def make_renewal_configobj(archive_dir: str, target: Mapping[str, str],
cli_config: configuration.NamespaceConfig) -> configobj.ConfigObj:
"""Create a configobj.ConfigObj representing the provided values.
:param str archive_dir: Absolute path to the archive directory
:param dict target: Maps ALL_FOUR to their symlink paths
:param .NamespaceConfig cli_config: parsed command line arguments
:returns: Configuration object representing the inputs
:rtype: configobj.ConfigObj
"""
config = configobj.ConfigObj(encoding='utf-8', default_encoding='utf-8')
config["version"] = certbot.__version__
config["archive_dir"] = archive_dir
for kind in ALL_FOUR:
config[kind] = target[kind]
config['renewalparams'] = {}
config['renewalparams'].update(relevant_values(cli_config))
return config
def get_link_target(link: str) -> str:
"""Get an absolute path to the target of link.
@ -1045,12 +1063,8 @@ class RenewableCert(interfaces.RenewableCert):
# Document what we've done in a new renewal config file
config_file.close()
# Save only the config items that are relevant to renewal
values = relevant_values(cli_config)
new_config = write_renewal_config(config_filename, config_filename, archive,
target, values)
return cls(new_config.filename, cli_config)
create_renewal_config_file(config_filename, archive, target, cli_config)
return cls(config_filename, cli_config)
def _private_key(self) -> Union[RSAPrivateKey, EllipticCurvePrivateKey]:
with open(self.configuration["privkey"], "rb") as priv_key_file:

View file

@ -745,26 +745,27 @@ class RenewableCertTests(BaseRenewableCertTest):
with pytest.raises(errors.CertStorageError):
storage.RenewableCert(self.config_file.filename, self.config)
def test_write_renewal_config(self):
def test_atomic_rewrite(self):
# Mostly tested by the process of creating and updating lineages,
# but we can test that this successfully creates files, removes
# unneeded items, and preserves comments.
# unneeded items, preserves permissions, and preserves comments.
temp = os.path.join(self.config.config_dir, "sample-file")
temp2 = os.path.join(self.config.config_dir, "sample-file.new")
with open(temp, "w") as f:
f.write("[renewalparams]\nuseful = value # A useful value\n"
"useless = value # Not needed\n")
filesystem.chmod(temp, 0o640)
target = {}
perms = stat.S_IMODE(os.lstat(temp).st_mode)
config = configobj.ConfigObj()
for x in ALL_FOUR:
target[x] = "somewhere"
archive_dir = "the_archive"
relevant_data = {"useful": "new_value"}
config[x] = "somewhere"
config["version"] = certbot.__version__
config["archive_dir"] = "the_archive"
config["renewalparams"] = {"useful": "new_value"}
from certbot._internal import storage
storage.write_renewal_config(temp, temp2, archive_dir, target, relevant_data)
storage.atomic_rewrite(temp, config)
with open(temp2, "r") as f:
with open(temp, "r") as f:
content = f.read()
# useful value was updated
assert "useful = new_value" in content
@ -775,8 +776,7 @@ class RenewableCertTests(BaseRenewableCertTest):
# check version was stored
assert "version = {0}".format(certbot.__version__) in content
# ensure permissions are copied
assert stat.S_IMODE(os.lstat(temp).st_mode) == \
stat.S_IMODE(os.lstat(temp2).st_mode)
assert stat.S_IMODE(os.lstat(temp).st_mode) == perms
def test_truncate(self):
# It should not do anything when there's less than 5 cert history