further fixes and tests for email_to_pdf

This commit is contained in:
Marc Durepos 2025-03-27 16:34:09 -04:00
parent 85964620cf
commit bfbc5d6491
5 changed files with 202 additions and 177 deletions

View file

@ -15,7 +15,7 @@ creation process to continue.
""",
"author": "Bemade",
"website": "https://bemade.org",
"depends": ["account"],
"depends": ["account", "mail"],
"data": [],
"installable": True,
"auto_install": False,

View file

@ -5,8 +5,8 @@ import subprocess
import tempfile
from contextlib import closing
from datetime import datetime
from email.utils import formatdate
from html import escape
import re
from odoo import models, fields
from odoo.tools.misc import find_in_path
@ -17,36 +17,37 @@ _logger = logging.getLogger(__name__)
class AccountMove(models.Model):
_inherit = "account.move"
def _check_and_decode_attachment(self, attachments):
"""Override to convert email message to PDF if no attachments are present."""
if not attachments or self.env.context.get("no_new_invoice"):
# Original code would return False here, causing email rejection
# Instead, we'll create a PDF from the email message
message_dict = self.env.context.get("message_dict", {})
if message_dict:
try:
# Create a PDF from the email content
pdf_attachment = self._create_pdf_from_email(message_dict)
if pdf_attachment:
_logger.info(
"Successfully created PDF attachment, proceeding with invoice creation"
)
# We need to return the result of _extend_with_attachments directly
# as that's what the original method would return
# Convert the list to a recordset before passing to _extend_with_attachments
attachment_recordset = self.env["ir.attachment"].browse(
[pdf_attachment.id]
)
return self._extend_with_attachments(
attachment_recordset,
new=bool(self._context.get("from_alias")),
)
except Exception as e:
_logger.exception("Error creating PDF from email: %s", e)
def message_new(self, msg_dict, custom_values={}):
"""Override to create a PDF attachment from the email content before processing.
This ensures that emails without attachments can still be processed and converted to invoices.
"""
# Get existing attachments in the email
attachments = msg_dict.get("attachments", [])
# Proceed with the original method
# If we were unable to generate a PDF, the original method will bounce the email
return super()._check_and_decode_attachment(attachments)
# Check if there are attachments that are not the .eml of the message itself
regex = re.compile(r"^.*\.eml$", re.IGNORECASE)
if len(attachments) > 1 or (len(attachments) == 1 and not regex.match(attachments[0][0])):
return super().message_new(msg_dict, custom_values=custom_values)
try:
# Create a PDF from the email content
pdf_attachment = self._create_pdf_from_email(msg_dict)
if pdf_attachment:
# Add the PDF to the message's attachments
if not msg_dict.get("attachments"):
msg_dict["attachments"] = []
# Convert the attachment to the format expected by mail_thread
# Format: (name, base64_data, info_dict)
attachment_data = (
pdf_attachment["name"],
pdf_attachment["datas"],
{"mimetype": "application/pdf"},
)
attachments.append(attachment_data)
except Exception as e:
_logger.exception("Error creating PDF from email: %s", e)
return super().message_new(msg_dict, custom_values=custom_values)
@classmethod
def _html_to_pdf(cls, html_content):
@ -58,6 +59,8 @@ class AccountMove(models.Model):
Returns:
bytes: PDF content as bytes or False if conversion failed
"""
# Check if wkhtmltopdf is installed
wkhtmltopdf_bin = find_in_path("wkhtmltopdf")
if not wkhtmltopdf_bin:
_logger.error("Cannot find wkhtmltopdf executable in system path")
@ -90,23 +93,26 @@ class AccountMove(models.Model):
command.append(html_file_path)
command.append(pdf_file_path)
# Execute wkhtmltopdf
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
_, err = process.communicate()
if process.returncode not in [0, 1]:
if process.returncode != 0:
_logger.error(
"wkhtmltopdf failed with error code %s: %s", process.returncode, err
)
return False
# Read the generated PDF
with open(pdf_file_path, "rb") as pdf_file:
pdf_content = pdf_file.read()
try:
with open(pdf_file_path, "rb") as pdf_file:
pdf_content = pdf_file.read()
return pdf_content
return pdf_content
except Exception as e:
_logger.exception("Error reading generated PDF file: %s", e)
return False
except Exception as e:
_logger.exception("Error during PDF generation: %s", e)
@ -127,7 +133,8 @@ class AccountMove(models.Model):
message_dict (dict): Email message dictionary
Returns:
ir.attachment: The created attachment record or False if failed
dict: Dictionary with keys `name` and `datas` containing the name and
base64 encoded data of the attachment
"""
# Extract email details
email_from = message_dict.get("email_from", "Unknown Sender")
@ -139,10 +146,16 @@ class AccountMove(models.Model):
if isinstance(email_date, datetime):
email_date = email_date.strftime("%Y-%m-%d %H:%M:%S")
# Check if body is empty or None
if not body:
_logger.warning("Email body is empty, using placeholder content")
body = "<p>This email did not contain any body content.</p>"
# Create HTML content for the PDF
html_content = f"""
<html>
<head>
<meta charset="UTF-8">
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.email-header {{ border-bottom: 1px solid #ccc; padding-bottom: 10px; margin-bottom: 20px; }}
@ -171,14 +184,10 @@ class AccountMove(models.Model):
# Create a proper ir.attachment record
filename = f"Email_{subject.replace(' ', '_')[:30]}.pdf"
attachment_vals = {
attachment = {
"name": filename,
"datas": base64.b64encode(pdf_content),
"mimetype": "application/pdf",
"res_model": "mail.message",
"res_id": message_dict.get("id", 0),
}
attachment = self.env["ir.attachment"].create(attachment_vals)
_logger.info("Created PDF attachment from email: %s", filename)
return attachment

View file

@ -1,2 +1 @@
from . import test_email_to_pdf
from . import test_email_integration

View file

@ -1,88 +0,0 @@
import base64
from odoo.tests.common import TransactionCase
from odoo.tools.misc import find_in_path
from unittest.mock import patch
class TestEmailProcessing(TransactionCase):
"""Integration tests for email processing with PDF generation."""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Check if wkhtmltopdf is available
cls.wkhtmltopdf_available = bool(find_in_path("wkhtmltopdf"))
# Get the model class to access the methods
cls.account_move = cls.env["account.move"]
def test_create_pdf_from_email(self):
"""Test that an email without attachments can be converted to PDF.
This test directly verifies that the _create_pdf_from_email method
correctly generates a PDF attachment from an email message without
attachments, allowing the invoice creation process to continue.
"""
if not self.wkhtmltopdf_available:
self.skipTest("wkhtmltopdf not available")
# Create a sample email message dictionary (similar to what would be parsed from an email)
message_dict = {
"subject": "Test Invoice",
"from": "test@example.com",
"to": "invoices@example.com",
"body": "<html><body><h1>Invoice Test</h1><p>This is a test invoice.</p></body></html>",
"attachments": [], # No attachments
"message_id": "<test123@example.com>",
}
# Call the method directly to create a PDF from the email
attachment = self.account_move._create_pdf_from_email(message_dict)
# Verify that an attachment was created
self.assertTrue(attachment, "An attachment should have been created")
# The actual name format is 'Email_' + subject + '.pdf' with spaces replaced by underscores
self.assertEqual(
attachment.name,
"Email_Test_Invoice.pdf",
"Attachment name should match expected format",
)
self.assertEqual(
attachment.mimetype, "application/pdf", "Attachment should be a PDF"
)
# Verify the content of the PDF attachment
pdf_data = base64.b64decode(attachment.datas)
self.assertTrue(pdf_data.startswith(b"%PDF-"), "Content should be a valid PDF")
self.assertTrue(len(pdf_data) > 100, "PDF should have a reasonable size")
def test_check_and_decode_attachment_with_empty_attachments(self):
"""Test that _check_and_decode_attachment doesn't reject emails with no attachments."""
if not self.wkhtmltopdf_available:
self.skipTest("wkhtmltopdf not available")
# Set up a context with a message_dict to simulate email processing
message_dict = {
"subject": "Test Invoice",
"from": "test@example.com",
"to": "invoices@example.com",
"body": "<html><body><h1>Invoice Test</h1><p>This is a test invoice.</p></body></html>",
"attachments": [], # No attachments
"message_id": "<test123@example.com>",
}
# Call the method with an empty attachments list
# We need to pass the message_dict in the context so _create_pdf_from_email can access it
result = self.account_move.with_context(
message_dict=message_dict
)._check_and_decode_attachment([])
# Verify that the result is not False (which would mean email rejection)
self.assertNotEqual(
result,
False,
"Should not reject the email when no attachments are provided",
)
# Verify that the result contains attachment data
self.assertTrue(result, "Should return attachment data")

View file

@ -1,11 +1,33 @@
from odoo.tests.common import TransactionCase
from odoo.tools.misc import find_in_path
import logging
from datetime import datetime
_logger = logging.getLogger(__name__)
class TestHtmlToPdf(TransactionCase):
"""
Test the functionality of converting an email to a PDF when it's received for a supplier invoice.
This test simulates the full flow of an email being received through
the mail alias and verifies that an account move is created with a PDF
attachment generated from the email content.
It's important to also run the tests in account, which can be done using the test tag:
`/account:TestAccountIncomingSupplierInvoice.test_extend_with_attachments_document_formats`
This specific test ensures we are not creating more attachments than necessary.
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Set up sender and alias for testing
cls.sender_email = "sender@example.com"
cls.alias_email = "test-invoices@example.com"
# Check if wkhtmltopdf is available
cls.wkhtmltopdf_available = bool(find_in_path("wkhtmltopdf"))
@ -33,6 +55,35 @@ class TestHtmlToPdf(TransactionCase):
</html>
"""
# Set up a supplier for testing
cls.supplier = cls.env["res.partner"].create(
{
"name": "Test Supplier",
"email": cls.sender_email,
}
)
# Set up a journal for incoming invoices
cls.journal = cls.env["account.journal"].search(
[("type", "=", "purchase")], limit=1
)
# Set up the mail alias domain
cls.alias_domain = cls.env["mail.alias.domain"].create({"name": "example.com"})
# Set up a mail alias for the journal
cls.alias = cls.env["mail.alias"].create(
{
"alias_name": cls.alias_email,
"alias_model_id": cls.env["ir.model"]
.search([("model", "=", "account.move")], limit=1)
.id,
"alias_domain_id": cls.alias_domain.id,
"alias_defaults": f'{{"move_type": "in_invoice", "journal_id": {cls.journal.id}}}',
}
)
cls.journal.alias_id = cls.alias
def test_html_to_pdf_conversion(self):
"""Test the direct HTML to PDF conversion."""
if not self.wkhtmltopdf_available:
@ -50,53 +101,107 @@ class TestHtmlToPdf(TransactionCase):
)
self.assertTrue(len(pdf_content) > 100, "PDF should have reasonable size")
def test_html_to_pdf_with_complex_content(self):
"""Test HTML to PDF conversion with more complex content."""
if not self.wkhtmltopdf_available:
self.skipTest("wkhtmltopdf not available")
# Integration test
def test_account_email_to_pdf_full_flow(self):
"""Test that an email without attachments is correctly converted to PDF
and an account move is created.
# More complex HTML with tables and images
complex_html = """
<html>
<head>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>Complex HTML Test</h1>
<table>
<tr>
<th>Header 1</th>
<th>Header 2</th>
<th>Header 3</th>
</tr>
<tr>
<td>Row 1, Cell 1</td>
<td>Row 1, Cell 2</td>
<td>Row 1, Cell 3</td>
</tr>
<tr>
<td>Row 2, Cell 1</td>
<td>Row 2, Cell 2</td>
<td>Row 2, Cell 3</td>
</tr>
</table>
</body>
</html>
This test simulates the full flow of an email being received through
the mail alias and verifies that an account move is created with a PDF
attachment generated from the email content.
"""
# Convert complex HTML to PDF
pdf_content = self.account_move._html_to_pdf(complex_html)
# Get the current timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
message_id = f"<test123_{timestamp}@example.com>"
subject = f"Invoice from Test Supplier {timestamp}"
date = datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0000")
# Verify the PDF was created
self.assertTrue(
pdf_content, "PDF content should be generated from complex HTML"
# HTML body of the email
html_body = "<html><body><h1>Invoice Test</h1><p>This is a test invoice from Test Supplier.</p><p>Amount: $100.00</p><p>Date: 2025-03-27</p></body></html>"
# Construct the raw email with proper headers
# Make sure the From header is correctly formatted for Odoo to parse
raw_email = f"""Return-Path: <{self.sender_email}>
X-Original-To: {self.alias_email}
Delivered-To: {self.alias_email}
Received: from mail.example.com (mail.example.com [192.168.1.1])
From: {self.sender_email}
To: {self.alias_email}
Subject: {subject}
Date: {date}
Message-ID: {message_id}
MIME-Version: 1.0
Content-Type: text/html; charset=UTF-8
Content-Transfer-Encoding: 7bit
{html_body}"""
# Process the email through the mail gateway
# This simulates what happens when fetchmail receives an email
invoice_count_before = self.env["account.move"].search_count(
[("move_type", "=", "in_invoice")]
)
self.assertTrue(
pdf_content.startswith(b"%PDF-"), "Content should be a valid PDF"
# Process the email through the mail gateway
# We need to specify the model explicitly since we're in a test environment
# In production, the alias would determine this automatically
self.env["mail.thread"].with_context(fetchmail_server_id=1).message_process(
model=None,
message=raw_email,
save_original=True,
strip_attachments=False,
)
# Count account moves after the test
# Note: move_type is the correct field name in Odoo, even if the linter doesn't recognize it
move_count_after = self.env["account.move"].search_count(
[("move_type", "=", "in_invoice")]
)
self.assertEqual(
move_count_after,
invoice_count_before + 1,
"A new account move should be created",
)
# Find the newly created invoice
invoice = self.env["account.move"].search(
[("move_type", "=", "in_invoice")], order="id desc", limit=1
)
self.assertTrue(invoice, "An account move should be created")
messages = invoice.message_ids
self.assertEqual(len(messages), 1, "The account move should have one message")
# Debug message attachments
_logger.info("Message ID: %s", messages.id)
_logger.info("Message attachments: %s", messages.attachment_ids)
_logger.info("Message attachment count: %s", len(messages.attachment_ids))
# Debug all attachments in the system
all_attachments = self.env["ir.attachment"].search(
[("res_model", "=", "mail.message"), ("res_id", "=", messages.id)]
)
_logger.info("All attachments for this message: %s", all_attachments)
_logger.info("All attachment count: %s", len(all_attachments))
attachment = messages.attachment_ids
self.assertTrue(attachment, "The message should have an attachment")
attachment = attachment.filtered(
lambda a: a.mimetype == "application/pdf" or ".pdf" in a.name
)
self.assertTrue(attachment, "The message should have a PDF attachment")
# Verify the invoice is linked to the correct supplier
self.assertEqual(
invoice.partner_id,
self.supplier,
"The invoice should be linked to the correct supplier",
)
# Verify the invoice type
self.assertEqual(
invoice.move_type, "in_invoice", "The invoice should be an incoming invoice"
)
self.assertTrue(len(pdf_content) > 100, "PDF should have reasonable size")