Skip to content

Commit

Permalink
[ATO-2122] Backport rasa export Kafka bugfix to 3.6.x (#13017)
Browse files Browse the repository at this point in the history
* backport bugfix, update CI workflow

* add changelog entry, update dead link in Docker build docs
  • Loading branch information
ancalita committed Feb 23, 2024
1 parent 2fe0a9c commit c169e84
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 25 deletions.
110 changes: 94 additions & 16 deletions .github/workflows/continous-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,6 @@ jobs:
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

services:
redis:
Expand Down Expand Up @@ -653,12 +649,6 @@ jobs:
# mapping container ports to the host
- 5432:5432

rabbitmq:
# see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck
image: healthcheck/rabbitmq
ports:
- 5672:5672

mongodb:
image: mongodb/mongodb-community-server:6.0.4-ubuntu2204
options: >-
Expand Down Expand Up @@ -728,6 +718,94 @@ jobs:
if grep 'The lock file is not up to date' .output; then exit 1; fi
make prepare-tests-ubuntu
- name: Test Code with Services 🩺
if: needs.changes.outputs.backend == 'true'
env:
JOBS: 2
INTEGRATION_TEST_PYTEST_MARKERS: '"(not sequential) and (not broker)"'
PYTHONIOENCODING: "utf-8"
run: |
make test-integration
broker_integration_test:
name: Run Broker Integration Tests
if: github.ref_type != 'tag'
runs-on: ubuntu-22.04
timeout-minutes: 60
needs: [changes]
env:
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

services:
rabbitmq:
# see https://github.com/docker-library/healthcheck/blob/master/rabbitmq/docker-healthcheck
image: healthcheck/rabbitmq
ports:
- 5672:5672

steps:
- name: Checkout git repository 🕝
if: needs.changes.outputs.backend == 'true'
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c

- name: Set up Python ${{ env.DEFAULT_PYTHON_VERSION }} 🐍
if: needs.changes.outputs.backend == 'true'
uses: actions/setup-python@57ded4d7d5e986d7296eab16560982c6dd7c923b
with:
python-version: ${{ env.DEFAULT_PYTHON_VERSION }}

- name: Read Poetry Version 🔢
if: needs.changes.outputs.backend == 'true'
run: |
echo "POETRY_VERSION=$(scripts/poetry-version.sh)" >> $GITHUB_ENV
shell: bash

- name: Install poetry 🦄
if: needs.changes.outputs.backend == 'true'
uses: Gr1N/setup-poetry@15821dc8a61bc630db542ae4baf6a7c19a994844 # v8
with:
poetry-version: ${{ env.POETRY_VERSION }}

- name: Load Poetry Cached Libraries ⬇
id: cache-poetry
if: needs.changes.outputs.backend == 'true'
uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8
with:
path: .venv
key: ${{ runner.os }}-poetry-${{ env.POETRY_VERSION }}-${{ env.DEFAULT_PYTHON_VERSION }}-${{ hashFiles('**/poetry.lock') }}-venv-${{ secrets.POETRY_CACHE_VERSION }}-${{ env.pythonLocation }}

- name: Clear Poetry cache
if: steps.cache-poetry.outputs.cache-hit == 'true' && needs.changes.outputs.backend == 'true' && contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests')
run: rm -r .venv

# Poetry >= 1.1.0b uses virtualenv to create a virtual environment.
# The virtualenv simply doesn't work on Windows with our setup,
# that's why we use venv to create virtual environment
- name: Create virtual environment
if: (steps.cache-poetry.outputs.cache-hit != 'true' || contains(github.event.pull_request.labels.*.name, 'tools:clear-poetry-cache-unit-tests')) && needs.changes.outputs.backend == 'true'
run: python -m venv create .venv

- name: Set up virtual environment
if: needs.changes.outputs.backend == 'true'
# Poetry on Windows cannot pick up the virtual environments directory properly,
# and it creates a new one every time the pipeline runs.
# This step solves this problem — it tells poetry to always use `.venv` directory inside
# the project itself, which also makes it easier for us to determine the correct directory
# that needs to be cached.
run: poetry config virtualenvs.in-project true

- name: Install Dependencies (Linux) 📦
if: needs.changes.outputs.backend == 'true'
run: |
sudo apt-get -y install libpq-dev
make install-full | tee .output
if grep 'The lock file is not up to date' .output; then exit 1; fi
make prepare-tests-ubuntu
make prepare-spacy
- name: Run kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
Expand All @@ -737,11 +815,16 @@ jobs:
if: needs.changes.outputs.backend == 'true'
env:
JOBS: 2
INTEGRATION_TEST_PYTEST_MARKERS: '"not sequential"'
INTEGRATION_TEST_PYTEST_MARKERS: "broker"
PYTHONIOENCODING: "utf-8"
run: |
make test-integration
- name: Stop kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
docker-compose -f tests_deployment/docker-compose.kafka.yml down
sequential_integration_test:
name: Run Sequential Integration Tests
if: github.ref_type != 'tag'
Expand Down Expand Up @@ -841,11 +924,6 @@ jobs:
run: |
make test-integration
- name: Stop kafka and zookeeper containers for integration testing
if: needs.changes.outputs.backend == 'true'
run: |
docker-compose -f tests_deployment/docker-compose.kafka.yml down
build_docker_base_images_and_set_env:
name: Build Docker base images and setup environment
runs-on: ubuntu-22.04
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

JOBS ?= 1
INTEGRATION_TEST_FOLDER = tests/integration_tests/
INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or not sequential"
INTEGRATION_TEST_PYTEST_MARKERS ?= "sequential or broker or ((not sequential) and (not broker))"
PLATFORM ?= "linux/amd64"

help:
Expand Down
1 change: 1 addition & 0 deletions changelog/13017.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Flush messages when Kafka producer is closed. This is to ensure that all messages in the producer's internal queue are sent to the broker.
2 changes: 1 addition & 1 deletion docs/docs/docker/building-in-docker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ The initial project files should all be there, as well as a `models` directory t
:::note
If you run into permission errors, it may be because the `rasa/rasa` images
run as user `1001` as a best practice, to avoid giving the container `root` permissions.
Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/edge/engine/reference/commandline/run/)
Hence, all files created by these containers will be owned by user `1001`. See the [Docker documentation](https://docs.docker.com/reference/cli/docker/container/run/)
if you want to run the containers as a different user.

:::
Expand Down
4 changes: 3 additions & 1 deletion rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ def _publish(self, event: Dict[Text, Any]) -> None:
on_delivery=delivery_report,
)

def _close(self) -> None:
async def close(self) -> None:
self._cancelled = True
self._poll_thread.join()
if self.producer:
self.producer.flush()

@rasa.shared.utils.common.lazy_property
def rasa_environment(self) -> Optional[Text]:
Expand Down
14 changes: 8 additions & 6 deletions tests/integration_tests/core/brokers/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pytest

from rasa.core.brokers.kafka import KafkaEventBroker
from pytest import LogCaptureFixture
import logging.config


def test_kafka_event_broker_valid():
@pytest.mark.broker
async def test_kafka_event_broker_valid():
broker = KafkaEventBroker(
url="localhost",
topic="rasa",
Expand All @@ -19,11 +22,11 @@ def test_kafka_event_broker_valid():
)
assert broker.producer.poll() == 1
finally:
broker.producer.flush()
broker._close()
await broker.close()


def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
@pytest.mark.broker
async def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
broker = KafkaEventBroker(
url="localhost",
topic="rasa",
Expand All @@ -48,5 +51,4 @@ def test_kafka_event_broker_buffer_error_is_handled(caplog: LogCaptureFixture):
assert "Queue full" in caplog.text
assert broker.producer.poll() == 1
finally:
broker.producer.flush()
broker._close()
await broker.close()
3 changes: 3 additions & 0 deletions tests/integration_tests/core/brokers/test_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)


@pytest.mark.broker
async def test_pika_event_broker_connect():
broker = PikaEventBroker(
host=RABBITMQ_HOST,
Expand All @@ -31,6 +32,7 @@ async def test_pika_event_broker_connect():
await broker.close()


@pytest.mark.broker
@pytest.mark.xdist_group("rabbitmq")
async def test_pika_event_broker_publish_after_restart(
docker_client: docker.DockerClient,
Expand Down Expand Up @@ -102,6 +104,7 @@ async def test_pika_event_broker_publish_after_restart(
rabbitmq_container.remove()


@pytest.mark.broker
@pytest.mark.xdist_group("rabbitmq")
@pytest.mark.parametrize("host_component", ["localhost", "myuser:mypassword@localhost"])
async def test_pika_event_broker_connect_with_path_and_query_params_in_url(
Expand Down
117 changes: 117 additions & 0 deletions tests/integration_tests/core/test_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import textwrap
from pathlib import Path
from unittest.mock import Mock

import pytest

from pytest import MonkeyPatch

from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.exporter import Exporter
from rasa.core.tracker_store import InMemoryTrackerStore
from rasa.shared.core.domain import Domain
from rasa.shared.core.events import ActionExecuted
from rasa.shared.core.trackers import DialogueStateTracker


@pytest.mark.broker
async def test_exporter_publishes_to_kafka_broker_success(
tmp_path: Path,
) -> None:
tracker_store = InMemoryTrackerStore(domain=Domain.empty())
tracker = DialogueStateTracker.from_events(
"test_export",
[
ActionExecuted("action_listen"),
],
)

await tracker_store.save(tracker)

kafka_broker = KafkaEventBroker(
url="localhost",
topic="rasa",
sasl_username="admin",
sasl_password="password",
partition_by_sender=True,
)

endpoints_file = tmp_path / "endpoints.yml"
endpoints_file.write_text(
textwrap.dedent(
"""
event_broker:
type: kafka
topic: rasa
url: localhost:9092
client_id: kafka-python-rasa
partition_by_sender: true
security_protocol: SASL_PLAINTEXT
sasl_username: admin
sasl_password: password
sasl_mechanism: PLAIN
"""
)
)

exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file))

published_events = await exporter.publish_events()
assert published_events == 1


@pytest.mark.broker
async def test_exporter_publishes_to_kafka_broker_fail(
tmp_path: Path,
monkeypatch: MonkeyPatch,
) -> None:
tracker_store = InMemoryTrackerStore(domain=Domain.empty())
tracker = DialogueStateTracker.from_events(
"test_export",
[
ActionExecuted("action_listen"),
],
)

await tracker_store.save(tracker)

kafka_broker = KafkaEventBroker(
url="localhost",
topic="rasa",
sasl_username="admin",
sasl_password="password",
partition_by_sender=True,
)

endpoints_file = tmp_path / "endpoints.yml"
endpoints_file.write_text(
textwrap.dedent(
"""
event_broker:
type: kafka
topic: rasa
url: localhost:9092
client_id: kafka-python-rasa
partition_by_sender: true
security_protocol: SASL_PLAINTEXT
sasl_username: admin
sasl_password: password
sasl_mechanism: PLAIN
"""
)
)

exporter = Exporter(tracker_store, kafka_broker, str(endpoints_file))

# patch the exporter to raise an exception when publishing events
monkeypatch.setattr(exporter, "publish_events", Mock(side_effect=Exception))

with pytest.raises(Exception) as error:
await exporter.publish_events()
assert "Producer terminating with 1 messages" in str(error.value)
assert (
"still in queue or transit: use flush() to wait for "
"outstanding message delivery" in str(error.value)
)
# necessary for producer teardown
await kafka_broker.close()

0 comments on commit c169e84

Please sign in to comment.