+
|
+
Field | +Value | +
---|---|
${label} | +${value} | +
+ |
+
+
+
+
+
|
+ + |
keep tags
" + self.api_session.patch( + self.document_url, + json={"blocks": new_blocks}, + ) + transaction.commit() + + self.assertEqual( + self.document.blocks["form-id"]["send_message"], + "click herekeep tags
", + ) diff --git a/backend/src/collective/volto/formsupport/tests/test_event.py b/backend/src/collective/volto/formsupport/tests/test_event.py new file mode 100644 index 0000000..5e48444 --- /dev/null +++ b/backend/src/collective/volto/formsupport/tests/test_event.py @@ -0,0 +1,139 @@ +from collective.volto.formsupport.testing import ( # noqa: E501, + VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING, +) +from plone import api +from plone.app.testing import setRoles +from plone.app.testing import SITE_OWNER_NAME +from plone.app.testing import SITE_OWNER_PASSWORD +from plone.app.testing import TEST_USER_ID +from plone.registry.interfaces import IRegistry +from plone.restapi.testing import RelativeSession +from Products.MailHost.interfaces import IMailHost +from zope.component import getUtility +from zope.configuration import xmlconfig + +import re +import transaction +import unittest + + +def event_handler(event): + event.data["data"].append( + {"label": "Reply", "value": "hello"}, + ) + + +class TestEvent(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + xmlconfig.string( + """ +This message will be sent to the person filling in the form.
It is Rich Text
" + }, + "subblocks": [ + { + "field_id": "contact", + "field_type": "from", + }, + { + "field_id": "message", + "field_type": "text", + }, + { + "field_id": "name", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + + response = self.submit_form( + data={ + "data": [ + { + "field_id": "message", + "label": "Message", + "value": "just want to say hi", + }, + {"field_id": "name", "label": "Name", "value": "Smith"}, + {"field_id": "contact", "label": "Email", "value": "smith@doe.com"}, + ], + "block_id": "form-id", + }, + ) + transaction.commit() + + self.assertEqual(response.status_code, 200) + msg = self.mailhost.messages[0] + if isinstance(msg, bytes) and bytes is not str: + # Python 3 with Products.MailHost 4.10+ + msg = msg.decode("utf-8") + + parsed_msg = Parser().parsestr(msg) + self.assertEqual(parsed_msg.get("from"), "john@doe.com") + self.assertEqual(parsed_msg.get("to"), "smith@doe.com") + self.assertEqual(parsed_msg.get("subject"), "block subject") + msg_body = parsed_msg.get_payload()[1].get_payload().replace("=\r\n", "") + self.assertIn( + "This message will be sent to the person filling in the form.
", + msg_body, + ) + self.assertIn("It is Rich Text
", msg_body) + + def test_send_recipient_and_acknowledgement(self): + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": { + "@type": "form", + "default_subject": "block subject", + "default_from": "john@doe.com", + "send": ["recipient", "acknowledgement"], + "acknowledgementFields": "contact", + "acknowledgementMessage": { + "data": "This message will be sent to the person filling in the form.
It is Rich Text
" + }, + "subblocks": [ + { + "field_id": "contact", + "field_type": "from", + }, + { + "field_id": "message", + "field_type": "text", + }, + { + "field_id": "name", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + + response = self.submit_form( + data={ + "data": [ + { + "field_id": "message", + "label": "Message", + "value": "just want to say hi", + }, + {"field_id": "name", "label": "Name", "value": "Smith"}, + {"field_id": "contact", "label": "Email", "value": "smith@doe.com"}, + ], + "block_id": "form-id", + }, + ) + transaction.commit() + + self.assertEqual(response.status_code, 200) + + msg = self.mailhost.messages[0] + if isinstance(msg, bytes) and bytes is not str: + # Python 3 with Products.MailHost 4.10+ + msg = msg.decode("utf-8") + parsed_msg = Parser().parsestr(msg) + self.assertEqual(parsed_msg.get("from"), "john@doe.com") + self.assertEqual(parsed_msg.get("to"), "site_addr@plone.com") + self.assertEqual(parsed_msg.get("subject"), "block subject") + + msg_body = parsed_msg.get_payload()[1].get_payload() + msg_body = re.sub(r"\s+", " ", msg_body) + self.assertIn("Message: just want to say hi", msg_body) + self.assertIn("Name: Smith", msg_body) + + acknowledgement_message = self.mailhost.messages[1] + if isinstance(acknowledgement_message, bytes) and bytes is not str: + # Python 3 with Products.MailHost 4.10+ + acknowledgement_message = acknowledgement_message.decode("utf-8") + + parsed_ack_msg = Parser().parsestr(acknowledgement_message) + self.assertEqual(parsed_ack_msg.get("from"), "john@doe.com") + self.assertEqual(parsed_ack_msg.get("to"), "smith@doe.com") + self.assertEqual(parsed_ack_msg.get("subject"), "block subject") + + ack_msg_body = ( + parsed_ack_msg.get_payload()[1].get_payload().replace("=\r\n", "") + ) + self.assertIn( + "This message will be sent to the person filling in the form.
", + ack_msg_body, + ) + self.assertIn("It is Rich Text
", ack_msg_body) + + def test_email_body_formated_as_table( + self, + ): + self.document.blocks = { + "form-id": { + "@type": "form", + "send": True, + "email_format": "table", + "subblocks": [ + { + "field_id": "message", + "field_type": "text", + }, + { + "field_id": "name", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + + subject = "test subject" + name = "John" + message = "just want to say hi" + + response = self.submit_form( + data={ + "from": "john@doe.com", + "data": [ + {"field_id": "message", "label": "Message", "value": message}, + {"field_id": "name", "label": "Name", "value": name}, + ], + "subject": subject, + "block_id": "form-id", + }, + ) + transaction.commit() + self.assertEqual(response.status_code, 200) + msg = self.mailhost.messages[0] + if isinstance(msg, bytes) and bytes is not str: + # Python 3 with Products.MailHost 4.10+ + msg = msg.decode("utf-8") + msg = re.sub(r"\s+", " ", msg).replace(" >", ">") + + self.assertIn(f"Subject: {subject}", msg) + self.assertIn("From: john@doe.com", msg) + self.assertIn("To: site_addr@plone.com", msg) + self.assertIn("Reply-To: john@doe.com", msg) + + self.assertIn("""keep tags
", + }, + { + "field_id": "name", + "label": "Name", + "value": " foo", + }, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + transaction.commit() + self.assertEqual(response.status_code, 200) + res = response.json() + self.assertEqual( + res, + { + "data": [ + { + "field_id": "message", + "label": "Message", + "value": "click here keep tags", + }, + { + "field_id": "name", + "label": "Name", + "value": "alert(‘XSS’) foo", + }, + ] + }, + ) + msg = self.mailhost.messages[0] + if isinstance(msg, bytes) and bytes is not str: + # Python 3 with Products.MailHost 4.10+ + msg = msg.decode("utf-8") + msg = re.sub(r"\s+", " ", msg) + self.assertIn( + "Message: click here keep tags", + msg, + ) + self.assertIn("Name: alert(=E2=80=98XSS=E2=80=99) foo", msg) diff --git a/backend/src/collective/volto/formsupport/tests/test_serialize_block.py b/backend/src/collective/volto/formsupport/tests/test_serialize_block.py new file mode 100644 index 0000000..1d89002 --- /dev/null +++ b/backend/src/collective/volto/formsupport/tests/test_serialize_block.py @@ -0,0 +1,231 @@ +from collective.volto.formsupport.testing import ( # noqa: E501, + VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING, +) +from plone import api +from plone.app.testing import setRoles +from plone.app.testing import SITE_OWNER_NAME +from plone.app.testing import SITE_OWNER_PASSWORD +from plone.app.testing import TEST_USER_ID +from plone.formwidget.hcaptcha.interfaces import IHCaptchaSettings +from plone.formwidget.recaptcha.interfaces import IReCaptchaSettings +from plone.registry.interfaces import IRegistry +from plone.restapi.testing import RelativeSession +from zope.component import getUtility + +import os +import transaction +import unittest + + +class TestBlockSerialization(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + self.app = self.layer["app"] + self.portal = self.layer["portal"] + self.portal_url = self.portal.absolute_url() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + + self.api_session = RelativeSession(self.portal_url) + self.api_session.headers.update({"Accept": "application/json"}) + self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + self.anon_api_session = RelativeSession(self.portal_url) + self.anon_api_session.headers.update({"Accept": "application/json"}) + + self.document = api.content.create( + type="Document", + title="Example context", + container=self.portal, + ) + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": { + "@type": "form", + "default_from": "foo@foo.com", + "default_bar": "bar", + "subblocks": [ + {"field_id": "name", "label": "Name", "type": "text"}, + {"field_id": "surname", "label": "Surname", "type": "text"}, + ], + }, + } + self.document_url = self.document.absolute_url() + api.content.transition(obj=self.document, transition="publish") + transaction.commit() + + def tearDown(self): + self.api_session.close() + self.anon_api_session.close() + + def test_serializer_return_full_block_data_to_admin(self): + response = self.api_session.get(self.document_url) + res = response.json() + self.assertEqual(res["blocks"]["form-id"], self.document.blocks["form-id"]) + + def test_serializer_return_filtered_block_data_to_anon(self): + response = self.anon_api_session.get(self.document_url) + res = response.json() + self.assertNotEqual(res["blocks"]["form-id"], self.document.blocks["form-id"]) + self.assertNotIn("default_from", res["blocks"]["form-id"].keys()) + self.assertNotIn("default_foo", res["blocks"]["form-id"].keys()) + self.assertIn("subblocks", res["blocks"]["form-id"].keys()) + + +class TestBlockSerializationRecaptcha(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + self.app = self.layer["app"] + self.portal = self.layer["portal"] + self.portal_url = self.portal.absolute_url() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + + self.api_session = RelativeSession(self.portal_url) + self.api_session.headers.update({"Accept": "application/json"}) + self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + self.anon_api_session = RelativeSession(self.portal_url) + self.anon_api_session.headers.update({"Accept": "application/json"}) + + self.document = api.content.create( + type="Document", + title="Example context", + container=self.portal, + ) + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": { + "@type": "form", + "default_from": "foo@foo.com", + "default_bar": "bar", + "subblocks": [ + {"field_id": "name", "label": "Name", "type": "text"}, + {"field_id": "surname", "label": "Surname", "type": "text"}, + ], + "captcha": "recaptcha", + }, + } + self.document_url = self.document.absolute_url() + api.content.transition(obj=self.document, transition="publish") + + self.registry = getUtility(IRegistry) + self.registry.registerInterface(IReCaptchaSettings) + settings = self.registry.forInterface(IReCaptchaSettings) + settings.public_key = "public" + settings.private_key = "private" + transaction.commit() + + def tearDown(self): + self.api_session.close() + self.anon_api_session.close() + + def test_serializer_with_recaptcha(self): + response = self.anon_api_session.get(self.document_url) + res = response.json() + self.assertEqual( + res["blocks"]["form-id"]["captcha_props"], + {"provider": "recaptcha", "public_key": "public"}, + ) + + +class TestBlockSerializationHCaptcha(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + self.app = self.layer["app"] + self.portal = self.layer["portal"] + self.portal_url = self.portal.absolute_url() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + + self.api_session = RelativeSession(self.portal_url) + self.api_session.headers.update({"Accept": "application/json"}) + self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + self.anon_api_session = RelativeSession(self.portal_url) + self.anon_api_session.headers.update({"Accept": "application/json"}) + + self.document = api.content.create( + type="Document", + title="Example context", + container=self.portal, + ) + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": { + "@type": "form", + "default_from": "foo@foo.com", + "default_bar": "bar", + "subblocks": [ + {"field_id": "name", "label": "Name", "type": "text"}, + {"field_id": "surname", "label": "Surname", "type": "text"}, + ], + "captcha": "hcaptcha", + }, + } + self.document_url = self.document.absolute_url() + api.content.transition(obj=self.document, transition="publish") + + self.registry = getUtility(IRegistry) + self.registry.registerInterface(IHCaptchaSettings) + settings = self.registry.forInterface(IHCaptchaSettings) + settings.public_key = "public" + settings.private_key = "private" + transaction.commit() + + def tearDown(self): + self.api_session.close() + self.anon_api_session.close() + + def test_serializer_with_hcaptcha(self): + response = self.anon_api_session.get(self.document_url) + res = response.json() + self.assertEqual( + res["blocks"]["form-id"]["captcha_props"], + {"provider": "hcaptcha", "public_key": "public"}, + ) + + +class TestBlockSerializationAttachmentsLimit(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + self.app = self.layer["app"] + self.portal = self.layer["portal"] + self.portal_url = self.portal.absolute_url() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + + self.api_session = RelativeSession(self.portal_url) + self.api_session.headers.update({"Accept": "application/json"}) + self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + self.anon_api_session = RelativeSession(self.portal_url) + self.anon_api_session.headers.update({"Accept": "application/json"}) + + self.document = api.content.create( + type="Document", + title="Example context", + container=self.portal, + ) + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": { + "@type": "form", + "default_from": "foo@foo.com", + "default_bar": "bar", + "subblocks": [ + {"field_id": "name", "label": "Name", "type": "text"}, + {"field_id": "surname", "label": "Surname", "type": "text"}, + ], + }, + } + self.document_url = self.document.absolute_url() + api.content.transition(obj=self.document, transition="publish") + + transaction.commit() + + def tearDown(self): + self.api_session.close() + self.anon_api_session.close() + os.environ["FORM_ATTACHMENTS_LIMIT"] = "" + + def test_serializer_without_attachments_limit(self): + response = self.anon_api_session.get(self.document_url) + res = response.json() + self.assertNotIn("attachments_limit", res["blocks"]["form-id"]) diff --git a/backend/src/collective/volto/formsupport/tests/test_setup.py b/backend/src/collective/volto/formsupport/tests/test_setup.py new file mode 100644 index 0000000..a9f39fc --- /dev/null +++ b/backend/src/collective/volto/formsupport/tests/test_setup.py @@ -0,0 +1,85 @@ +"""Setup tests for this package.""" + +from collective.volto.formsupport.testing import ( # noqa: E501, + VOLTO_FORMSUPPORT_INTEGRATION_TESTING, +) +from plone import api +from plone.app.testing import setRoles +from plone.app.testing import TEST_USER_ID + +import unittest + + +try: + from plone.base.utils import get_installer +except ImportError: + from Products.CMFPlone.utils import get_installer + + +class TestSetup(unittest.TestCase): + """Test that collective.volto.formsupport is properly installed.""" + + layer = VOLTO_FORMSUPPORT_INTEGRATION_TESTING + + def setUp(self): + """Custom shared utility setup for tests.""" + self.portal = self.layer["portal"] + self.installer = get_installer(self.portal, self.layer["request"]) + + def test_product_installed(self): + """Test if collective.volto.formsupport is installed.""" + if hasattr(self.installer, "isProductInstalled"): + self.assertTrue( + self.installer.isProductInstalled("collective.volto.formsupport") + ) + else: # plone 6 + self.assertTrue( + self.installer.is_product_installed("collective.volto.formsupport") + ) + + def test_browserlayer(self): + """Test that ICollectiveVoltoFormsupportLayer is registered.""" + from collective.volto.formsupport.interfaces import ( + ICollectiveVoltoFormsupportLayer, + ) + from plone.browserlayer import utils + + self.assertIn(ICollectiveVoltoFormsupportLayer, utils.registered_layers()) + + +class TestUninstall(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer["portal"] + if get_installer: + self.installer = get_installer(self.portal, self.layer["request"]) + else: + self.installer = api.portal.get_tool("portal_quickinstaller") + roles_before = api.user.get_roles(TEST_USER_ID) + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + if hasattr(self.installer, "uninstallProducts"): + self.installer.uninstallProducts(["collective.volto.formsupport"]) + else: # plone6 + self.installer.uninstall_product("collective.volto.formsupport") + setRoles(self.portal, TEST_USER_ID, roles_before) + + def test_product_uninstalled(self): + """Test if collective.volto.formsupport is cleanly uninstalled.""" + if hasattr(self.installer, "isProductInstalled"): + self.assertFalse( + self.installer.isProductInstalled("collective.volto.formsupport") + ) + else: # plone 6 + self.assertFalse( + self.installer.is_product_installed("collective.volto.formsupport") + ) + + def test_browserlayer_removed(self): + """Test that ICollectiveVoltoFormsupportLayer is removed.""" + from collective.volto.formsupport.interfaces import ( + ICollectiveVoltoFormsupportLayer, + ) + from plone.browserlayer import utils + + self.assertNotIn(ICollectiveVoltoFormsupportLayer, utils.registered_layers()) diff --git a/backend/src/collective/volto/formsupport/tests/test_store_action_form.py b/backend/src/collective/volto/formsupport/tests/test_store_action_form.py new file mode 100644 index 0000000..5bf076d --- /dev/null +++ b/backend/src/collective/volto/formsupport/tests/test_store_action_form.py @@ -0,0 +1,317 @@ +from collective.volto.formsupport.testing import ( # noqa: E501, + VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING, +) +from datetime import datetime +from io import StringIO +from plone import api +from plone.app.testing import setRoles +from plone.app.testing import SITE_OWNER_NAME +from plone.app.testing import SITE_OWNER_PASSWORD +from plone.app.testing import TEST_USER_ID +from plone.restapi.testing import RelativeSession +from Products.MailHost.interfaces import IMailHost +from zope.component import getUtility + +import csv +import transaction +import unittest + + +class TestMailStore(unittest.TestCase): + layer = VOLTO_FORMSUPPORT_API_FUNCTIONAL_TESTING + + def setUp(self): + self.app = self.layer["app"] + self.portal = self.layer["portal"] + self.portal_url = self.portal.absolute_url() + setRoles(self.portal, TEST_USER_ID, ["Manager"]) + + self.mailhost = getUtility(IMailHost) + + self.api_session = RelativeSession(self.portal_url) + self.api_session.headers.update({"Accept": "application/json"}) + self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + self.anon_api_session = RelativeSession(self.portal_url) + self.anon_api_session.headers.update({"Accept": "application/json"}) + + self.document = api.content.create( + type="Document", + title="Example context", + container=self.portal, + ) + + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": {"@type": "form"}, + } + self.document_url = self.document.absolute_url() + transaction.commit() + + def tearDown(self): + self.api_session.close() + self.anon_api_session.close() + + # set default block + self.document.blocks = { + "text-id": {"@type": "text"}, + "form-id": {"@type": "form"}, + } + transaction.commit() + + def submit_form(self, data): + url = f"{self.document_url}/@submit-form" + response = self.api_session.post( + url, + json=data, + ) + transaction.commit() + return response + + def export_data(self): + url = f"{self.document_url}/@form-data" + response = self.api_session.get(url) + return response + + def export_csv(self): + url = f"{self.document_url}/@form-data-export" + response = self.api_session.get(url) + return response + + def clear_data(self): + url = f"{self.document_url}/@form-data-clear" + response = self.api_session.delete(url) + return response + + def test_unable_to_store_data(self): + """form schema not defined, unable to store data""" + self.document.blocks = { + "form-id": { + "@type": "form", + "store": True, + }, + } + transaction.commit() + + response = self.submit_form( + data={ + "from": "john@doe.com", + "data": [ + { + "field_id": "message", + "label": "Message", + "value": "just want to say hi", + }, + {"field_id": "name", "label": "Name", "value": "John"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + transaction.commit() + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["message"], "Empty form data.") + response = self.export_csv() + + def test_store_data(self): + self.document.blocks = { + "form-id": { + "@type": "form", + "store": True, + "subblocks": [ + { + "label": "Message", + "field_id": "message", + "field_type": "text", + }, + { + "label": "Name", + "field_id": "name", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + + response = self.submit_form( + data={ + "from": "john@doe.com", + "data": [ + {"field_id": "message", "value": "just want to say hi"}, + {"field_id": "name", "value": "John"}, + {"field_id": "foo", "value": "skip this"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + transaction.commit() + self.assertEqual(response.status_code, 200) + response = self.export_data() + data = response.json() + self.assertEqual(len(data["items"]), 1) + self.assertEqual( + sorted(data["items"][0].keys()), + ["__expired", "block_id", "date", "id", "message", "name"], + ) + self.assertEqual( + data["items"][0]["message"], + {"label": "Message", "value": "just want to say hi"}, + ) + self.assertEqual(data["items"][0]["name"], {"label": "Name", "value": "John"}) + response = self.submit_form( + data={ + "from": "sally@doe.com", + "data": [ + {"field_id": "message", "value": "bye"}, + {"field_id": "name", "value": "Sally"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + transaction.commit() + self.assertEqual(response.status_code, 200) + response = self.export_data() + data = response.json() + self.assertEqual(len(data["items"]), 2) + self.assertEqual( + sorted(data["items"][0].keys()), + ["__expired", "block_id", "date", "id", "message", "name"], + ) + self.assertEqual( + sorted(data["items"][1].keys()), + ["__expired", "block_id", "date", "id", "message", "name"], + ) + sorted_data = sorted(data["items"], key=lambda x: x["name"]["value"]) + self.assertEqual(sorted_data[0]["name"]["value"], "John") + self.assertEqual(sorted_data[0]["message"]["value"], "just want to say hi") + self.assertEqual(sorted_data[1]["name"]["value"], "Sally") + self.assertEqual(sorted_data[1]["message"]["value"], "bye") + + # clear data + response = self.clear_data() + self.assertEqual(response.status_code, 204) + response = self.export_csv() + data = [*csv.reader(StringIO(response.text), delimiter=",")] + self.assertEqual(len(data), 1) + self.assertEqual(data[0], ["date"]) + + def test_export_csv(self): + self.document.blocks = { + "form-id": { + "@type": "form", + "store": True, + "subblocks": [ + { + "label": "Message", + "field_id": "message", + "field_type": "text", + }, + { + "label": "Name", + "field_id": "name", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + response = self.submit_form( + data={ + "from": "john@doe.com", + "data": [ + {"field_id": "message", "value": "just want to say hi"}, + {"field_id": "name", "value": "John"}, + {"field_id": "foo", "value": "skip this"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + + response = self.submit_form( + data={ + "from": "sally@doe.com", + "data": [ + {"field_id": "message", "value": "bye"}, + {"field_id": "name", "value": "Sally"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + + self.assertEqual(response.status_code, 200) + response = self.export_csv() + data = [*csv.reader(StringIO(response.text), delimiter=",")] + self.assertEqual(len(data), 3) + self.assertEqual(data[0], ["Message", "Name", "date"]) + sorted_data = sorted(data[1:]) + self.assertEqual(sorted_data[0][:-1], ["bye", "Sally"]) + self.assertEqual(sorted_data[1][:-1], ["just want to say hi", "John"]) + + # check date column. Skip seconds because can change during test + now = datetime.now().strftime("%Y-%m-%dT%H:%M") + self.assertTrue(sorted_data[0][-1].startswith(now)) + self.assertTrue(sorted_data[1][-1].startswith(now)) + + def test_data_id_mapping(self): + self.document.blocks = { + "form-id": { + "@type": "form", + "store": True, + "test-field": "renamed-field", + "subblocks": [ + { + "field_id": "message", + "label": "Message", + "field_type": "text", + }, + { + "field_id": "test-field", + "label": "Test field", + "field_type": "text", + }, + ], + }, + } + transaction.commit() + response = self.submit_form( + data={ + "from": "john@doe.com", + "data": [ + {"field_id": "message", "value": "just want to say hi"}, + {"field_id": "test-field", "value": "John"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + + response = self.submit_form( + data={ + "from": "sally@doe.com", + "data": [ + {"field_id": "message", "value": "bye"}, + {"field_id": "test-field", "value": "Sally"}, + ], + "subject": "test subject", + "block_id": "form-id", + }, + ) + + self.assertEqual(response.status_code, 200) + response = self.export_csv() + data = [*csv.reader(StringIO(response.text), delimiter=",")] + self.assertEqual(len(data), 3) + # Check that 'test-field' got renamed + self.assertEqual(data[0], ["Message", "renamed-field", "date"]) + sorted_data = sorted(data[1:]) + self.assertEqual(sorted_data[0][:-1], ["bye", "Sally"]) + self.assertEqual(sorted_data[1][:-1], ["just want to say hi", "John"]) + + # check date column. Skip seconds because can change during test + now = datetime.now().strftime("%Y-%m-%dT%H:%M") + self.assertTrue(sorted_data[0][-1].startswith(now)) + self.assertTrue(sorted_data[1][-1].startswith(now)) diff --git a/backend/src/collective/volto/formsupport/upgrades.py b/backend/src/collective/volto/formsupport/upgrades.py new file mode 100644 index 0000000..0eb7114 --- /dev/null +++ b/backend/src/collective/volto/formsupport/upgrades.py @@ -0,0 +1,233 @@ +from Acquisition import aq_base +from collective.volto.formsupport.interfaces import IFormDataStore +from copy import deepcopy +from plone import api +from plone.dexterity.utils import iterSchemata +from plone.i18n.normalizer.interfaces import IIDNormalizer +from souper.soup import Record +from zope.component import getMultiAdapter +from zope.component import getUtility +from zope.globalrequest import getRequest +from zope.schema import getFields + + +try: + from collective.volto.blocksfield.field import BlocksField + + HAS_BLOCKSFIELD = True +except ImportError: + HAS_BLOCKSFIELD = False + +from collective.volto.formsupport import logger + +import json + + +DEFAULT_PROFILE = "profile-collective.volto.formsupport:default" + + +def _has_block_form(block_data): + for block in block_data.values(): + if block.get("@type", "") == "form": + return True + return False + + +def _get_all_content_with_blocks(): + content = [] + + portal = api.portal.get() + portal_blocks = getattr(portal, "blocks", "") + if portal_blocks: + if _has_block_form(portal_blocks): + content.append(portal) + + pc = api.portal.get_tool(name="portal_catalog") + brains = pc() + total = len(brains) + + for i, brain in enumerate(brains): + if i % 100 == 0: + logger.info(f"Progress: {i + 1}/{total}") + item = brain.getObject() + for schema in iterSchemata(item.aq_base): + for name, field in getFields(schema).items(): + if name == "blocks": + if _has_block_form(getattr(item, "blocks", {})): + content.append(item) + + return content + + +def to_1100(context): # noqa: C901 # pragma: no cover + logger.info("### START CONVERSION FORM BLOCKS ###") + + def fix_block(blocks, url): + for block in blocks.values(): + if block.get("@type", "") != "form": + continue + found = False + for field in block.get("subblocks", []): + if field.get("field_type", "") == "checkbox": + field["field_type"] = "multiple_choice" + found = True + if field.get("field_type", "") == "radio": + field["field_type"] = "simple_choice" + found = True + if found: + logger.info(f"[CONVERTED] - {url}") + + # fix root + portal = api.portal.get() + portal_blocks = getattr(portal, "blocks", "") + if portal_blocks: + blocks = json.loads(portal_blocks) + fix_block(blocks, portal.absolute_url()) + portal.blocks = json.dumps(blocks) + + # fix blocks in contents + pc = api.portal.get_tool(name="portal_catalog") + brains = pc() + tot = len(brains) + i = 0 + for brain in brains: + i += 1 + if i % 1000 == 0: + logger.info(f"Progress: {i}/{tot}") + item = aq_base(brain.getObject()) + for schema in iterSchemata(item): + for name, field in getFields(schema).items(): + if name == "blocks": + blocks = deepcopy(item.blocks) + if blocks: + fix_block(blocks, brain.getURL()) + item.blocks = blocks + elif HAS_BLOCKSFIELD and isinstance(field, BlocksField): + value = deepcopy(field.get(item)) + if not value: + continue + if isinstance(value, str): + if value == "": + setattr( + item, + name, + {"blocks": {}, "blocks_layout": {"items": []}}, + ) + continue + if blocks: + fix_block(blocks, brain.getURL()) + setattr(item, name, value) + + +def to_1200(context): # noqa: C901 # pragma: no cover + logger.info("### START CONVERSION STORED DATA ###") + + def get_field_info_from_block(block, field_id): + normalizer = getUtility(IIDNormalizer) + for field in block.get("subblocks", []): + normalized_label = normalizer.normalize(field.get("label", "")) + if field_id == normalized_label: + return {"id": field["field_id"], "label": field.get("label", "")} + elif field_id == field["field_id"]: + return {"id": field["field_id"], "label": field.get("label", "")} + return {"id": field_id, "label": field_id} + + def fix_data(blocks, context): + fixed = False + for block in blocks.values(): + if block.get("@type", "") != "form": + continue + if not block.get("store", False): + continue + store = getMultiAdapter((context, getRequest()), IFormDataStore) + fixed = True + data = store.search() + for record in data: + labels = {} + new_record = Record() + for k, v in record.attrs.items(): + new_id = get_field_info_from_block(block=block, field_id=k) + new_record.attrs[new_id["id"]] = v + labels.update({new_id["id"]: new_id["label"]}) + new_record.attrs["fields_labels"] = labels + # create new entry + store.soup.add(new_record) + # remove old one + store.delete(record.intid) + return fixed + + fixed_contents = [] + # fix root + portal = api.portal.get() + portal_blocks = getattr(portal, "blocks", "") + if portal_blocks: + blocks = json.loads(portal_blocks) + res = fix_data(blocks, portal) + if res: + fixed_contents.append("/") + + # fix blocks in contents + pc = api.portal.get_tool(name="portal_catalog") + brains = pc() + tot = len(brains) + i = 0 + for brain in brains: + i += 1 + if i % 100 == 0: + logger.info(f"Progress: {i}/{tot}") + item = brain.getObject() + for schema in iterSchemata(item.aq_base): + for name, field in getFields(schema).items(): + if name == "blocks": + blocks = getattr(item, "blocks", {}) + if blocks: + res = fix_data(blocks, item) + if res: + fixed_contents.append(brain.getPath()) + elif HAS_BLOCKSFIELD and isinstance(field, BlocksField): + value = field.get(item) + if not value: + continue + if isinstance(value, str): + continue + blocks = value.get("blocks", {}) + if blocks: + res = fix_data(blocks, item) + if res: + fixed_contents.append(brain.getPath()) + logger.info(f"Fixed {len(fixed_contents)} contents:") + for path in fixed_contents: + logger.info(f"- {path}") + + +def to_1300(context): # noqa: C901 # pragma: no cover + def update_send_from_bool_to_list_for_content(item): + blocks = ( + item.blocks + ) # We've already checked we've a form block so no need to guard here + + for block in blocks.values(): + if block.get("@type", "") != "form": + continue + send = block.get("send") + if isinstance(send, bool): + new_send_value = ["recipient"] if block.get("send") else [] + block["send"] = new_send_value + logger.info( + "[CONVERTED] - {} form send value from {} to {}".format( + item, send, new_send_value + ) + ) + + item.blocks = blocks + + logger.info("### START UPGRADE SEND FROM STRING TO ARRAY ###") + + content = _get_all_content_with_blocks() + + for item in content: + update_send_from_bool_to_list_for_content(item) + + logger.info("### FINISHED UPGRADE SEND FROM STRING TO ARRAY ###") + + # self.block.get("send") diff --git a/backend/src/collective/volto/formsupport/upgrades.zcml b/backend/src/collective/volto/formsupport/upgrades.zcml new file mode 100644 index 0000000..5943a3f --- /dev/null +++ b/backend/src/collective/volto/formsupport/upgrades.zcml @@ -0,0 +1,35 @@ +