diff --git a/certbot/src/certbot/_internal/storage.py b/certbot/src/certbot/_internal/storage.py index ab6a36dea..2e1474890 100644 --- a/certbot/src/certbot/_internal/storage.py +++ b/certbot/src/certbot/_internal/storage.py @@ -105,55 +105,59 @@ def add_time_interval(base_time: datetime.datetime, interval: str, return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0] -def write_renewal_config(o_filename: str, n_filename: str, archive_dir: str, - target: Mapping[str, str], - relevant_data: Mapping[str, Any]) -> configobj.ConfigObj: - """Writes a renewal config file with the specified name and values. +def create_renewal_config_file(filename: str, archive_dir: str, + target: Mapping[str, str], + cli_config: configuration.NamespaceConfig) -> None: + """Creates a renewal config file with the specified name and values. - :param str o_filename: Absolute path to the previous version of config file - :param str n_filename: Absolute path to the new destination of config file + :param str filename: Absolute path to the new destination of config file :param str archive_dir: Absolute path to the archive directory :param dict target: Maps ALL_FOUR to their symlink paths - :param dict relevant_data: Renewal configuration options to save - - :returns: Configuration object for the new config file - :rtype: configobj.ConfigObj - + :param .NamespaceConfig cli_config: parsed command line + arguments """ - config = configobj.ConfigObj(o_filename, encoding='utf-8', default_encoding='utf-8') - config["version"] = certbot.__version__ - config["archive_dir"] = archive_dir - for kind in ALL_FOUR: - config[kind] = target[kind] - - if "renewalparams" not in config: - config["renewalparams"] = {} - config.comments["renewalparams"] = ["", - "Options used in " - "the renewal process"] - - config["renewalparams"].update(relevant_data) - - for k in config["renewalparams"]: - if k not in relevant_data: - del config["renewalparams"][k] - - # TODO: add human-readable comments explaining other available - # parameters - logger.debug("Writing new config %s.", n_filename) - - # Ensure that the file exists - with open(n_filename, 'a'): - pass - - # Copy permissions from the old version of the file, if it exists. - if os.path.exists(o_filename): - current_permissions = stat.S_IMODE(os.lstat(o_filename).st_mode) - filesystem.chmod(n_filename, current_permissions) - - with open(n_filename, "wb") as f: + config = make_renewal_configobj(archive_dir, target, cli_config) + with open(filename, "wb") as f: config.write(outfile=f) - return config + + +def atomic_rewrite(config_filename: str, new_config: configobj.ConfigObj) -> None: + """Update a config file on disk with the provided values. + + The update will be atomic: if writing fails, the original file will not be modified. The + updated file will preserve comments from the original and will have the same permissions. + + In general, fields that exist on disk but not in new_config will be preserved. As a special + case, fields in the 'renewalparams' section will be deleted unless they exist in new_config. + This deletion clears out old fields from when we used to dump _all_ flags (as opposed to + relevant ones). + + :param str config_filename: Absolute path to the configuration file + :param configobj.ConfigObj new_config: New configuration object + """ + # Read the existing values from `config_filename`, or error (file_error=True) + merged_config = configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8', + file_error=True) + merged_config.merge(new_config) + + # We merge and then delete, rather than just replacing the 'renewalparams' section, so we can + # preserve comments from the on-disk config file. + for k in merged_config["renewalparams"]: + if k not in new_config["renewalparams"]: + del merged_config["renewalparams"][k] + + current_permissions = stat.S_IMODE(os.lstat(config_filename).st_mode) + + temp_filename = config_filename + ".new" + + # If an existing tempfile exists, delete it + if os.path.exists(temp_filename): + os.unlink(temp_filename) + + with util.safe_open(temp_filename, "wb", current_permissions) as f: + merged_config.write(outfile=f) + + filesystem.replace(temp_filename, config_filename) def rename_renewal_config(prev_name: str, new_name: str, @@ -182,28 +186,42 @@ def update_configuration(lineagename: str, archive_dir: str, target: Mapping[str :param str lineagename: Name of the lineage being modified :param str archive_dir: Absolute path to the archive directory :param dict target: Maps ALL_FOUR to their symlink paths - :param .NamespaceConfig cli_config: parsed command line - arguments + :param .NamespaceConfig cli_config: parsed command line arguments :returns: Configuration object for the updated config file :rtype: configobj.ConfigObj """ + config = make_renewal_configobj(archive_dir, target, cli_config) + config_filename = renewal_filename_for_lineagename(cli_config, lineagename) - temp_filename = config_filename + ".new" - # If an existing tempfile exists, delete it - if os.path.exists(temp_filename): - os.unlink(temp_filename) - - # Save only the config items that are relevant to renewal - values = relevant_values(cli_config) - write_renewal_config(config_filename, temp_filename, archive_dir, target, values) - filesystem.replace(temp_filename, config_filename) + atomic_rewrite(config_filename, config) return configobj.ConfigObj(config_filename, encoding='utf-8', default_encoding='utf-8') +def make_renewal_configobj(archive_dir: str, target: Mapping[str, str], + cli_config: configuration.NamespaceConfig) -> configobj.ConfigObj: + """Create a configobj.ConfigObj representing the provided values. + + :param str archive_dir: Absolute path to the archive directory + :param dict target: Maps ALL_FOUR to their symlink paths + :param .NamespaceConfig cli_config: parsed command line arguments + + :returns: Configuration object representing the inputs + :rtype: configobj.ConfigObj + """ + config = configobj.ConfigObj(encoding='utf-8', default_encoding='utf-8') + config["version"] = certbot.__version__ + config["archive_dir"] = archive_dir + for kind in ALL_FOUR: + config[kind] = target[kind] + config['renewalparams'] = {} + config['renewalparams'].update(relevant_values(cli_config)) + return config + + def get_link_target(link: str) -> str: """Get an absolute path to the target of link. @@ -1045,12 +1063,8 @@ class RenewableCert(interfaces.RenewableCert): # Document what we've done in a new renewal config file config_file.close() - # Save only the config items that are relevant to renewal - values = relevant_values(cli_config) - - new_config = write_renewal_config(config_filename, config_filename, archive, - target, values) - return cls(new_config.filename, cli_config) + create_renewal_config_file(config_filename, archive, target, cli_config) + return cls(config_filename, cli_config) def _private_key(self) -> Union[RSAPrivateKey, EllipticCurvePrivateKey]: with open(self.configuration["privkey"], "rb") as priv_key_file: diff --git a/certbot/src/certbot/_internal/tests/storage_test.py b/certbot/src/certbot/_internal/tests/storage_test.py index ffdaa4848..64673a366 100644 --- a/certbot/src/certbot/_internal/tests/storage_test.py +++ b/certbot/src/certbot/_internal/tests/storage_test.py @@ -745,26 +745,27 @@ class RenewableCertTests(BaseRenewableCertTest): with pytest.raises(errors.CertStorageError): storage.RenewableCert(self.config_file.filename, self.config) - def test_write_renewal_config(self): + def test_atomic_rewrite(self): # Mostly tested by the process of creating and updating lineages, # but we can test that this successfully creates files, removes - # unneeded items, and preserves comments. + # unneeded items, preserves permissions, and preserves comments. temp = os.path.join(self.config.config_dir, "sample-file") - temp2 = os.path.join(self.config.config_dir, "sample-file.new") with open(temp, "w") as f: f.write("[renewalparams]\nuseful = value # A useful value\n" "useless = value # Not needed\n") filesystem.chmod(temp, 0o640) - target = {} + perms = stat.S_IMODE(os.lstat(temp).st_mode) + config = configobj.ConfigObj() for x in ALL_FOUR: - target[x] = "somewhere" - archive_dir = "the_archive" - relevant_data = {"useful": "new_value"} + config[x] = "somewhere" + config["version"] = certbot.__version__ + config["archive_dir"] = "the_archive" + config["renewalparams"] = {"useful": "new_value"} from certbot._internal import storage - storage.write_renewal_config(temp, temp2, archive_dir, target, relevant_data) + storage.atomic_rewrite(temp, config) - with open(temp2, "r") as f: + with open(temp, "r") as f: content = f.read() # useful value was updated assert "useful = new_value" in content @@ -775,8 +776,7 @@ class RenewableCertTests(BaseRenewableCertTest): # check version was stored assert "version = {0}".format(certbot.__version__) in content # ensure permissions are copied - assert stat.S_IMODE(os.lstat(temp).st_mode) == \ - stat.S_IMODE(os.lstat(temp2).st_mode) + assert stat.S_IMODE(os.lstat(temp).st_mode) == perms def test_truncate(self): # It should not do anything when there's less than 5 cert history