Merge pull request #10084 from atombrella/pyupgrade/up024_oserror

Replace aliased OSError.
This commit is contained in:
Will Greenberg 2024-12-17 14:34:10 -08:00 committed by GitHub
commit 5fca4a14ab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 46 additions and 47 deletions

View file

@ -193,7 +193,7 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
from acme.standalone import BaseDualNetworkedServers
mock_bind.side_effect = socket.error(EADDRINUSE, "Fake addr in use error")
mock_bind.side_effect = OSError(EADDRINUSE, "Fake addr in use error")
with pytest.raises(socket.error) as exc_info:
BaseDualNetworkedServers(

View file

@ -134,7 +134,7 @@ class SSLSocket: # pylint: disable=too-few-public-methods
# in the standard library. This is useful when this object is
# used by code which expects a standard socket such as
# socketserver in the standard library.
raise socket.error(error)
raise OSError(error)
def accept(self) -> Tuple[FakeConnection, Any]: # pylint: disable=missing-function-docstring
sock, addr = self.sock.accept()
@ -158,7 +158,7 @@ class SSLSocket: # pylint: disable=too-few-public-methods
except SSL.Error as error:
# _pick_certificate_cb might have returned without
# creating SSL context (wrong server name)
raise socket.error(error)
raise OSError(error)
return ssl_sock, addr
except:
@ -206,7 +206,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
)
socket_tuple: Tuple[bytes, int] = (host, port)
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore[arg-type]
except socket.error as error:
except OSError as error:
raise errors.Error(error)
with contextlib.closing(sock) as client:

View file

@ -98,7 +98,7 @@ class BaseDualNetworkedServers:
logger.debug(
"Successfully bound to %s:%s using %s", new_address[0],
new_address[1], "IPv6" if ip_version else "IPv4")
except socket.error as e:
except OSError as e:
last_socket_err = e
if self.servers:
# Already bound using IPv6.
@ -121,7 +121,7 @@ class BaseDualNetworkedServers:
if last_socket_err:
raise last_socket_err
else: # pragma: no cover
raise socket.error("Could not bind to IPv4 or IPv6.")
raise OSError("Could not bind to IPv4 or IPv6.")
def serve_forever(self) -> None:
"""Wraps socketserver.TCPServer.serve_forever"""

View file

@ -295,7 +295,7 @@ class ApacheConfigurator(common.Configurator):
try:
with open(ssl_module_location, mode="rb") as f:
contents = f.read()
except IOError as error:
except OSError as error:
logger.debug(str(error), exc_info=True)
return None
return contents
@ -928,7 +928,7 @@ class ApacheConfigurator(common.Configurator):
try:
socket.inet_aton(addr.get_addr())
return socket.gethostbyaddr(addr.get_addr())[0]
except (socket.error, socket.herror, socket.timeout):
except (OSError, socket.herror, socket.timeout):
pass
return ""
@ -1523,7 +1523,7 @@ class ApacheConfigurator(common.Configurator):
# activation (it's not included as default)
if not self.parser.parsed_in_current(ssl_fp):
self.parser.parse_file(ssl_fp)
except IOError:
except OSError:
logger.critical("Error writing/reading to file in make_vhost_ssl", exc_info=True)
raise errors.PluginError("Unable to write/read in make_vhost_ssl")

View file

@ -153,7 +153,7 @@ class ApacheParser:
try:
# This is a noop save
self.aug.save()
except (RuntimeError, IOError):
except (OSError, RuntimeError):
self._log_save_errors(ex_errs)
# Erase Save Notes
self.configurator.save_notes = ""
@ -198,7 +198,7 @@ class ApacheParser:
ex_errs = self.aug.match("/augeas//error")
try:
self.aug.save()
except IOError:
except OSError:
self._log_save_errors(ex_errs)
raise

View file

@ -686,7 +686,7 @@ class NginxConfigurator(common.Configurator):
else:
socket.inet_pton(socket.AF_INET, host)
all_names.add(socket.gethostbyaddr(host)[0])
except (socket.error, socket.herror, socket.timeout):
except (OSError, socket.herror, socket.timeout):
continue
return util.get_filtered_names(all_names)

View file

@ -215,7 +215,7 @@ class NginxParser:
parsed = nginxparser.load(_file)
self.parsed[item] = parsed
trees.append(parsed)
except IOError:
except OSError:
logger.warning("Could not open file: %s", item)
except UnicodeDecodeError:
logger.warning("Could not read file: %s due to invalid "
@ -258,7 +258,7 @@ class NginxParser:
with io.open(filename, 'w', encoding='utf-8') as _file:
_file.write(out)
except IOError:
except OSError:
logger.error("Could not open file for writing: %s", filename)
def parse_server(self, server: UnspacedList) -> Dict[str, Any]:
@ -433,7 +433,7 @@ def _parse_ssl_options(ssl_options: Optional[str]) -> List[UnspacedList]:
try:
with io.open(ssl_options, "r", encoding="utf-8") as _file:
return nginxparser.load(_file)
except IOError:
except OSError:
logger.warning("Missing NGINX TLS options file: %s", ssl_options)
except UnicodeDecodeError:
logger.warning("Could not read file: %s due to invalid character. "

View file

@ -223,7 +223,7 @@ class AccountFileStorage(interfaces.AccountStorage):
key = jose.JWK.json_loads(key_file.read())
with open(self._metadata_path(account_dir_path)) as metadata_file:
meta = Account.Meta.json_loads(metadata_file.read())
except IOError as error:
except OSError as error:
raise errors.AccountStorageError(error)
return Account(regr, key, meta)
@ -243,7 +243,7 @@ class AccountFileStorage(interfaces.AccountStorage):
self._create(account, dir_path)
self._update_meta(account, dir_path)
self._update_regr(account, dir_path)
except IOError as error:
except OSError as error:
raise errors.AccountStorageError(error)
def update_regr(self, account: Account) -> None:
@ -255,7 +255,7 @@ class AccountFileStorage(interfaces.AccountStorage):
try:
dir_path = self._prepare(account)
self._update_regr(account, dir_path)
except IOError as error:
except OSError as error:
raise errors.AccountStorageError(error)
def update_meta(self, account: Account) -> None:
@ -267,7 +267,7 @@ class AccountFileStorage(interfaces.AccountStorage):
try:
dir_path = self._prepare(account)
self._update_meta(account, dir_path)
except IOError as error:
except OSError as error:
raise errors.AccountStorageError(error)
def delete(self, account_id: str) -> None:

View file

@ -117,7 +117,7 @@ def lineage_for_certname(cli_config: configuration.NamespaceConfig,
return None
try:
return storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
except (OSError, errors.CertStorageError):
logger.debug("Renewal conf file %s is broken.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
@ -419,7 +419,7 @@ def _search_lineages(cli_config: configuration.NamespaceConfig, func: Callable[.
for renewal_file in storage.renewal_conf_files(cli_config):
try:
candidate_lineage = storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
except (OSError, errors.CertStorageError):
logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
continue

View file

@ -40,7 +40,7 @@ def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]:
with open(filename, mode) as the_file:
contents = the_file.read()
return filename, contents
except IOError as exc:
except OSError as exc:
raise argparse.ArgumentTypeError(exc.strerror)

View file

@ -124,7 +124,7 @@ class _UnixLockMechanism(_BaseLockMechanism):
"""
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as err:
except OSError as err:
if err.errno in (errno.EACCES, errno.EAGAIN):
logger.debug('A lock on %s is held by another process.', self._path)
raise errors.LockError('Another instance of Certbot is already running.')
@ -210,7 +210,7 @@ class _WindowsLockMechanism(_BaseLockMechanism):
# are only defined on Windows. See
# https://github.com/python/typeshed/blob/16ae4c61201cd8b96b8b22cdfb2ab9e89ba5bcf2/stdlib/msvcrt.pyi.
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1) # type: ignore # pylint: disable=used-before-assignment
except (IOError, OSError) as err:
except OSError as err:
if fd:
os.close(fd)
# Anything except EACCES is unexpected. Raise directly the error in that case.

View file

@ -167,7 +167,7 @@ def setup_log_file_handler(config: configuration.NamespaceConfig, logfile: str,
handler = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2 ** 20,
backupCount=config.max_log_backups)
except IOError as error:
except OSError as error:
raise errors.Error(util.PERM_ERR_FMT.format(error))
# rotate on each invocation, rollover only possible when maxBytes
# is nonzero and backupCount is nonzero, so we set maxBytes as big

View file

@ -2,7 +2,6 @@
import collections
import errno
import logging
import socket
from typing import Any
from typing import Callable
from typing import DefaultDict
@ -78,7 +77,7 @@ class ServerManager:
try:
servers = acme_standalone.HTTP01DualNetworkedServers(
address, self.http_01_resources)
except socket.error as error:
except OSError as error:
raise errors.StandaloneBindError(error, port)
servers.serve_forever()

View file

@ -74,7 +74,7 @@ def reconstitute(config: configuration.NamespaceConfig,
"""
try:
renewal_candidate = storage.RenewableCert(full_path, config)
except (errors.CertStorageError, IOError) as error:
except (OSError, errors.CertStorageError) as error:
logger.error("Renewal configuration file %s is broken.", full_path)
logger.error("The error was: %s\nSkipping.", str(error))
logger.debug("Traceback was:\n%s", traceback.format_exc())

View file

@ -115,10 +115,10 @@ class LockFileTest(test_util.TempDirTestCase):
mocked_function = 'certbot._internal.lock.msvcrt.locking'
msg = 'hi there'
with mock.patch(mocked_function) as mock_lock:
mock_lock.side_effect = IOError(msg)
mock_lock.side_effect = OSError(msg)
try:
self._call(self.lock_path)
except IOError as err:
except OSError as err:
assert msg in str(err)
else: # pragma: no cover
self.fail('IOError not raised')

View file

@ -59,7 +59,7 @@ class ServerManagerTest(unittest.TestCase):
maybe_another_server = socket.socket()
try:
maybe_another_server.bind(("", port))
except socket.error:
except OSError:
pass
with pytest.raises(errors.StandaloneBindError):
self.mgr.run(port,

View file

@ -190,7 +190,7 @@ class AuthenticatorTest(unittest.TestCase):
with open(permission_canary, "r"):
pass
print("Warning, running tests as root skips permissions tests...")
except IOError:
except OSError:
# ok, permissions work, test away...
with pytest.raises(errors.PluginError):
self.auth.perform([])

View file

@ -65,7 +65,7 @@ class ReverterCheckpointLocalTest(test_util.ConfigTestCase):
def test_add_to_checkpoint_copy_failure(self):
with mock.patch("certbot.reverter.shutil.copy2") as mock_copy2:
mock_copy2.side_effect = IOError("bad copy")
mock_copy2.side_effect = OSError("bad copy")
with pytest.raises(errors.ReverterError):
self.reverter.add_to_checkpoint(self.sets[0], "save1")

View file

@ -308,7 +308,7 @@ def verify_renewable_cert_sig(renewable_cert: interfaces.RenewableCert) -> None:
assert cert.signature_hash_algorithm # always present for RSA and ECDSA
verify_signed_payload(pk, cert.signature, cert.tbs_certificate_bytes,
cert.signature_hash_algorithm)
except (IOError, ValueError, InvalidSignature) as e:
except (OSError, ValueError, InvalidSignature) as e:
error_str = "verifying the signature of the certificate located at {0} has failed. \
Details: {1}".format(renewable_cert.cert_path, e)
logger.exception(error_str)
@ -355,7 +355,7 @@ def verify_cert_matches_priv_key(cert_path: str, key_path: str) -> None:
context.use_certificate_file(cert_path)
context.use_privatekey_file(key_path)
context.check_privatekey()
except (IOError, SSL.Error) as e:
except (OSError, SSL.Error) as e:
error_str = "verifying the certificate located at {0} matches the \
private key located at {1} has failed. \
Details: {2}".format(cert_path,
@ -383,7 +383,7 @@ def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None:
error_str = "fullchain does not match cert + chain for {0}!"
error_str = error_str.format(renewable_cert.lineagename)
raise errors.Error(error_str)
except IOError as e:
except OSError as e:
error_str = "reading one of cert, chain, or fullchain has failed: {0}".format(e)
logger.exception(error_str)
raise errors.Error(error_str)

View file

@ -48,7 +48,7 @@ class PluginStorage:
try:
with open(self._storagepath, 'r') as fh:
filedata = fh.read()
except IOError as e:
except OSError as e:
errmsg = "Could not read PluginStorage data file: {0} : {1}".format(
self._storagepath, str(e))
if os.path.isfile(self._storagepath):
@ -92,7 +92,7 @@ class PluginStorage:
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
0o600), 'w') as fh:
fh.write(serialized)
except IOError as e:
except OSError as e:
errmsg = "Could not write PluginStorage data to file {0} : {1}".format(
self._storagepath, str(e))
logger.error(errmsg)

View file

@ -184,7 +184,7 @@ class Reverter:
cp_dir, os.path.basename(filename) + "_" + str(idx)))
op_fd.write('{0}\n'.format(filename))
# https://stackoverflow.com/questions/4726260/effective-use-of-python-shutil-copy2
except IOError:
except OSError:
op_fd.close()
logger.error(
"Unable to add file %s to checkpoint %s",
@ -238,7 +238,7 @@ class Reverter:
shutil.copy2(os.path.join(
cp_dir,
os.path.basename(path) + "_" + str(idx)), path)
except (IOError, OSError):
except OSError:
# This file is required in all checkpoints.
logger.error("Unable to recover files from %s", cp_dir)
raise errors.ReverterError(f"Unable to recover files from {cp_dir}")
@ -327,7 +327,7 @@ class Reverter:
for path in files:
if path not in ex_files:
new_fd.write("{0}\n".format(path))
except (IOError, OSError):
except OSError:
logger.error("Unable to register file creation(s) - %s", files)
raise errors.ReverterError(
"Unable to register file creation(s) - {0}".format(files))
@ -360,7 +360,7 @@ class Reverter:
with open(commands_fp, mode, **kwargs) as f: # type: ignore
csvwriter = csv.writer(f)
csvwriter.writerow(command)
except (IOError, OSError):
except OSError:
logger.error("Unable to register undo command")
raise errors.ReverterError(
"Unable to register undo command.")
@ -434,7 +434,7 @@ class Reverter:
"File: %s - Could not be found to be deleted\n"
" - Certbot probably shut down unexpectedly",
path)
except (IOError, OSError):
except OSError:
logger.critical(
"Unable to remove filepaths contained within %s", file_list)
raise errors.ReverterError(
@ -476,7 +476,7 @@ class Reverter:
# Move self.config.in_progress_dir to Backups directory
shutil.move(changes_since_tmp_path, changes_since_path)
except (IOError, OSError):
except OSError:
logger.error("Unable to finalize checkpoint - adding title")
logger.debug("Exception was:\n%s", traceback.format_exc())
raise errors.ReverterError("Unable to add title")

View file

@ -665,12 +665,12 @@ def is_ipaddress(address: str) -> bool:
socket.inet_pton(socket.AF_INET, address)
# If this line runs it was ip address (ipv4)
return True
except socket.error:
except OSError:
# It wasn't an IPv4 address, so try ipv6
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error:
except OSError:
return False

View file

@ -185,7 +185,7 @@ def block_until_ssh_open(ipstring, wait_time=10, timeout=120):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ipstring, 22))
reached = True
except socket.error as err:
except OSError as err:
time.sleep(wait_time)
t_elapsed += wait_time
sock.close()