Merge pull request #7797 from g6123/nginx-utf8

Use UTF-8 encoding for nginx plugin
This commit is contained in:
schoen 2020-03-13 17:07:40 -07:00 committed by GitHub
commit 30ec4cafe1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 192 additions and 3 deletions

View file

@ -266,5 +266,6 @@ Authors
* [Yomna](https://github.com/ynasser)
* [Yoni Jah](https://github.com/yonjah)
* [YourDaddyIsHere](https://github.com/YourDaddyIsHere)
* [Yuseong Cho](https://github.com/g6123)
* [Zach Shepherd](https://github.com/zjs)
* [陈三](https://github.com/chenxsan)

View file

@ -2,6 +2,7 @@
import copy
import functools
import glob
import io
import logging
import re
@ -205,12 +206,16 @@ class NginxParser(object):
if item in self.parsed and not override:
continue
try:
with open(item) as _file:
with io.open(item, "r", encoding="utf-8") as _file:
parsed = nginxparser.load(_file)
self.parsed[item] = parsed
trees.append(parsed)
except IOError:
logger.warning("Could not open file: %s", item)
except UnicodeDecodeError:
logger.warning("Could not read file: %s due to invalid "
"character. Only UTF-8 encoding is "
"supported.", item)
except pyparsing.ParseException as err:
logger.debug("Could not parse file: %s due to %s", item, err)
return trees
@ -414,10 +419,13 @@ class NginxParser(object):
def _parse_ssl_options(ssl_options):
if ssl_options is not None:
try:
with open(ssl_options) as _file:
with io.open(ssl_options, "r", encoding="utf-8") as _file:
return nginxparser.load(_file)
except IOError:
logger.warning("Missing NGINX TLS options file: %s", ssl_options)
except UnicodeDecodeError:
logger.warning("Could not read file: %s due to invalid character. "
"Only UTF-8 encoding is supported.", ssl_options)
except pyparsing.ParseBaseException as err:
logger.debug("Could not parse file: %s due to %s", ssl_options, err)
return []

View file

@ -482,7 +482,43 @@ class NginxParserTest(util.NginxTest):
called = True
self.assertTrue(called)
def test_valid_unicode_characters(self):
nparser = parser.NginxParser(self.config_path)
path = nparser.abs_path('valid_unicode_comments.conf')
parsed = nparser._parse_files(path) # pylint: disable=protected-access
self.assertEqual(['server'], parsed[0][2][0])
self.assertEqual(['listen', '80'], parsed[0][2][1][3])
def test_invalid_unicode_characters(self):
with self.assertLogs() as log:
nparser = parser.NginxParser(self.config_path)
path = nparser.abs_path('invalid_unicode_comments.conf')
parsed = nparser._parse_files(path) # pylint: disable=protected-access
self.assertEqual([], parsed)
self.assertTrue(any(
('invalid character' in output) and ('UTF-8' in output)
for output in log.output
))
def test_valid_unicode_characters_in_ssl_options(self):
nparser = parser.NginxParser(self.config_path)
path = nparser.abs_path('valid_unicode_comments.conf')
parsed = parser._parse_ssl_options(path) # pylint: disable=protected-access
self.assertEqual(['server'], parsed[2][0])
self.assertEqual(['listen', '80'], parsed[2][1][3])
def test_invalid_unicode_characters_in_ssl_options(self):
with self.assertLogs() as log:
nparser = parser.NginxParser(self.config_path)
path = nparser.abs_path('invalid_unicode_comments.conf')
parsed = parser._parse_ssl_options(path) # pylint: disable=protected-access
self.assertEqual([], parsed)
self.assertTrue(any(
('invalid character' in output) and ('UTF-8' in output)
for output in log.output
))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -0,0 +1,125 @@
"""Backport for `TestCase.assertLogs()`.
Most of the idea and code are from CPython implementation.
https://github.com/python/cpython/blob/b76518d43fb82ed9e5d27025d18c90a23d525c90/Lib/unittest/case.py
"""
import logging
import collections
__all__ = ['AssertLogsMixin']
LoggingWatcher = collections.namedtuple('LoggingWatcher', ['records', 'output'])
class CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""
def __init__(self):
super(CapturingHandler, self).__init__()
self.watcher = LoggingWatcher([], [])
def flush(self):
pass
def emit(self, record):
self.watcher.records.append(record)
self.watcher.output.append(self.format(record))
class AssertLogsContext(object):
"""
A context manager used to implement `TestCase.assertLogs()`.
"""
LOGGING_FORMAT = '%(levelname)s:%(name)s:%(message)s'
def __init__(self, test_case, logger_name, level):
self.test_case = test_case
self.logger_name = logger_name
self.logger_states = None
self.logger = None
if level:
# pylint: disable=protected-access,no-member
try:
# In Python 3.x
name_to_level = logging._nameToLevel # type: ignore
except AttributeError:
# In Python 2.7
name_to_level = logging._levelNames # type: ignore
self.level = name_to_level.get(level, level)
else:
self.level = logging.INFO
self.watcher = None
def _save_logger_states(self):
self.logger_states = (self.logger.handlers[:], self.logger.level, self.logger.propagate)
def _restore_logger_states(self):
self.logger.handlers, self.logger.level, self.logger.propagate = self.logger_states
def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
self.logger = self.logger_name
else:
self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = CapturingHandler()
handler.setFormatter(formatter)
self._save_logger_states()
self.logger.handlers = [handler]
self.logger.setLevel(self.level)
self.logger.propagate = False
self.watcher = handler.watcher
return handler.watcher
def __exit__(self, exc_type, exc_value, tb):
self._restore_logger_states()
if exc_type is not None:
# let unexpected exceptions pass through
return
if not self.watcher.records:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
def _raiseFailure(self, message):
message = self.test_case._formatMessage(None, message) # pylint: disable=protected-access
raise self.test_case.failureException(message)
class AssertLogsMixin(object):
"""
A mixin that implements `TestCase.assertLogs()`.
"""
def assertLogs(self, logger=None, level=None):
"""Fail unless a log message of level *level* or higher is emitted
on *logger_name* or its children. If omitted, *level* defaults to
INFO and *logger* defaults to the root logger.
This method must be used as a context manager, and will yield
a recording object with two attributes: `output` and `records`.
At the end of the context manager, the `output` attribute will
be a list of the matching formatted log messages and the
`records` attribute will be a list of the corresponding LogRecord
objects.
Example::
with self.assertLogs('foo', level='INFO') as cm:
logging.getLogger('foo').info('first message')
logging.getLogger('foo.bar').error('second message')
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
"""
return AssertLogsContext(self, logger, level)

View file

@ -14,9 +14,10 @@ from certbot.plugins import common
from certbot.tests import util as test_util
from certbot_nginx._internal import configurator
from certbot_nginx._internal import nginxparser
import test_log_util
class NginxTest(test_util.ConfigTestCase):
class NginxTest(test_log_util.AssertLogsMixin, test_util.ConfigTestCase):
def setUp(self):
super(NginxTest, self).setUp()

View file

@ -0,0 +1,7 @@
# This configuration file is saved with EUC-KR (a.k.a. cp949) encoding,
# including some Korean letters.
server {
# 안녕하세요. 80번 포트에서 요청을 기다린다.
listen 80;
}

View file

@ -0,0 +1,9 @@
# This configuration file is saved with valid UTF-8 encoding,
# including some CJK alphabets.
server {
# 안녕하세요. 80번 포트에서 요청을 기다린다.
# こんにちは。80番ポートからリクエストを待つ。
# 你好。等待端口80上的请求。
listen 80;
}

View file

@ -22,6 +22,8 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
* When using an RFC 8555 compliant endpoint, the `acme` library no longer sends the
`resource` field in any requests or the `type` field when responding to challenges.
* Fix nginx plugin crash when non-ASCII configuration file is being read (instead,
the user will be warned that UTF-8 must be used).
More details about these changes can be found on our GitHub repo.