
2024-05-10
Symfony 6 Kafka RabbitMQ Docker compose сборки с описанием
2023-11-17
Итак друзья. Ни для кого не секрет, что в php нет как таковой многопоточности в ее привычном представлении. Здесь многопоточность заменяют очереди, которые можно условно назвать "многопроцессность".
Для тех кто ни разу не сталкивался с очередями дам немного вводных. Очереди используются для асинхронного выполнения какой-либо логики в параллель с другими такими же процессами. В основном этот подход используется в высоконагруженных приложениях для обработки большого объема информации или для выполнения различных задач в большом количестве. Суть этой схемы такова, что есть условная очередь, как в магазине, есть producer - та компонента, которая ставит задачи в очередь и есть consumer'ы, которые разбирают эту очередь(и). Продюсеров и конзумеров может быть столько, сколько вам позволит железо вашего сервера. В этой статье я не буду расписывать особенности работы с демонами, здесь я хочу разобрать выбор инструментов, а также дать два комплекта сборок docker-compose.yaml файлов для Symfony 6 под два самых популярных решения.
Итак, на рынке есть два самых популярных решения для очередей - это kafka и rabbitmq. Поверхностно они выполняют одно и то же, но между ними есть ощутимые различия, которые необходимо внимательно продумать прежде чем делать какой-либо выбор в ту или иную сторону. Сразу забегая вперед скажу, что здесь не говорится о том, что кто-то плохой, а кто-то хороший - у каждого из них есть свои преимущества и свои недостатки, которые мы как раз сейчас и разберем.
Начнем с RabbiMq. Он написан по принципу "Умный продюсер, тупой конзумер". Это значит, что продюсер берет на себя львиную долю всей работы над контролем очередей, например следит за прочитанными сообщениями в очереди, удаляет их, организует процесс распределения сообщений между конзумерами и т.д. RabbitMq работает по протоколу AMQP, но при этом поддерживает и многие другие протоколы, такие как MQTT, STOMP, HTTP.
Кратко работу кролика ( Rabbit Mq ) можно расписать так:
1) Продюсер отправляет сообщение в обменник;
2) Обменник отправляет сообщения по очередям и в другие обменники;
3) Конзумеры поддерживают постоянное TCP соединение с кроликом и слушают определенную очередь;
4) Как только в очереди появляется сообщение - свободный конзумер его подхватывает и отправлет подтверждение об успешном получении;
5) После успешного получения сообщение удаляется из очереди;
Kafka в отличие от Кролика работает по принципу "Умный конзумер, тупой продюсер". Кафка не занимается контролем и распределением сообщений. Продюсер не выполняет никакой дополнительной логики в брокере. Благодаря этому Кафка имеет намного более высокую пропускную способность данных при более низкой задержке для обработки потоков в реальном времени.
Несмотря на то, что в текущем контексте и кафка и кролик рассматриваются как брокеры очередей, Кафка имеет принципиально другую схему работы и более сложную структуру. Процесс работы кафки можно было бы кратко описать следующим образом:
1) Продюсер отправляет сообщение в топик;
2) В каждом топике есть партиции, одна или более, которые разделяют очереди, чтобы убересь топик от переполнения;
3) Брокер ( сервер кафки ) отвечает за прием и хранение сообщений с последующей передачей конзумеру;
4) Конзумер получает одно или несколько сообщений, но в отличие от кролика - сам приходит за сообщениями, а не получает их принудительно;
Кафка, пожалуй, самый популярный инструмент для работы с потоковыми данными. Поэтому он может хорошо применяться в следующих сценариях:
-- Сбор логов
-- Обработка потоков
-- Брокер очередей ( там где нужна большая скорость )
-- Обработка огромных потоков данных в реальном времени с частотой сообщений от тысяч до миллионов сообщений в секунду
-- Если несколько конзумеров должны получать все сообщения
Кролик идеален для более стандарнтных очередей или брокера сообщений
-- Если нужна сложная маршрутизация
-- Может стать отличным выбором для посылки краткосрочных сообщений между микросервисами
-- Отлично подходит, когда нужна поддержка разных протоколов
-- Может быть хорошим выбором для постобработки данных, где не так важна фееричная скорость, но важна простота настройки и поддержки
Также никто не ограничивает вас в том, чтобы использовать оба эти инструмента и использовать их под конкретные задачи.
Также хотелось бы отметить со стороны разработчика, что RabbitMq более простой в плане настроек и практически сразу работает из коробки, у него отличная документация, развитое сообщение, много плагинов, которые расширяют его возможности. Поэтому, если вы только знакомитесь с функционалом очередей - это будет оптимальный выбор. Кафка в плане настройки и поддержки ощутимо сложнее, в интернете про нее информация более разрозненная и собрать ее в рабочее состояние ощутимо сложнее, даже уже имея опыт с кроликом.
Итак, теперь, когда мы определелились с выбором приступаем к сборке приложения на докере.
На всякий случай приложу ссылку на установку докера.
Предполагается, что у вас уже есть установленное приложение Symfony 6.3
Давайте теперь разберемся с докером. У меня есть несколько работоспособных сборок для докера под разные проекты и под разные нужды. Их можно комбинировать между собой, создавая новые сборки, но начнем с довольно стандартной сборки на следующем стеке:
-- Symfony 6.3
-- Xdebug
-- php 8.1
-- Nginx
-- mysql percona 8
-- rabbitMq
-- mailcatcher
-- postfix
-- dovecot
Для такой сборки файл docker-compose.yml будет выглядеть следующим образом:
version: '3.1'
services:
project-name-php-fpm:
build:
context: ./php-fpm
dockerfile: Dockerfile
args:
# # для VS Code:
# XDEBUG_CLIENT_HOST: host.docker.internal
XDEBUG_CLIENT_HOST: 172.26.0.1
XDEBUG_CLIENT_PORT: 9000
image: php-fpm
restart: always
volumes:
- ../:/app
- ./tmp:/tmp/tmp:rw
working_dir: /app
container_name: 'project-name-php-fpm'
hostname: 'localhost'
networks:
project-network:
ipv4_address: 172.26.0.3
environment:
XDEBUG_CONFIG: client_host=172.26.0.1 client_port=9000 remote_enable=1
PHP_IDE_CONFIG: serverName=localhost
DEV: 1
# # Параметр для запуска в VS Code
# extra_hosts:
# - "host.docker.internal:host-gateway"
project-name-nginx:
image: nginx:1.15.0
container_name: 'project-name-nginx'
working_dir: /app
ports:
- '23080:80'
- '23081:81'
networks:
project-network:
ipv4_address: 172.26.0.4
restart: always
volumes:
- ../:/app
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
project-name-db:
container_name: project-name-db
image: percona:8
user: root
restart: always
command: ['mysqld', '--user=root', "--sql-mode=STRICT_TRANS_TABLES,ALLOW_INVALID_DATES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION"]
environment:
MYSQL_DATABASE: project-name
MYSQL_ROOT_PASSWORD: pass
ports:
- '23082:3306'
volumes:
- ../:/app
- ./data/mysql:/var/lib/mysql:rw
networks:
project-network:
ipv4_address: 172.26.0.5
project-name-rabbit-mq:
build:
context: ./rabbitmq
dockerfile: Dockerfile
#image: rabbitmq:3.10.7-management
working_dir: /app
hostname: 'localhost'
restart: always
container_name: 'project-name-rabbit-mq'
networks:
project-network:
ipv4_address: 172.26.0.6
ports:
- '23672:15672'
- '23673:5672'
environment:
RABBITMQ_DEFAULT_USER: 'user'
RABBITMQ_DEFAULT_PASS: 'password'
RABBITMQ_DEFAULT_VHOST: '/'
project-name-mailcatcher:
container_name: project-name-mailcatcher
image: schickling/mailcatcher
ports:
- '23180:1080'
- '23125:1025'
networks:
project-network:
ipv4_address: 172.26.0.14
aliases:
- mailcatcher
project-name-postfix:
container_name: project-name-postfix
build: ./postfix
depends_on:
- project-name-dovecot
environment:
smtp_user: user:password
volumes:
- ./postfix/scripts:/root/postfix:rw
- ./postfix/config/main.cf:/etc/postfix/main.cf
- project-name-postfix-dovecot:/shared
ports:
- "23025:25"
networks:
project-network:
ipv4_address: 172.26.0.13
aliases:
- mail.project-name.docker
project-name-dovecot:
container_name: project-name-dovecot
build: ./dovecot
volumes:
- ./dovecot/config/dovecot.conf:/etc/dovecot/dovecot.conf
- project-name-postfix-dovecot:/shared
ports:
- "23110:110"
- "23143:143"
networks:
project-network:
ipv4_address: 172.26.0.12
aliases:
- dovecot.docker
volumes:
mysql:
project-name-data: {}
project-name-postfix-dovecot: {}
networks:
project-network:
driver: bridge
ipam:
config:
- subnet: 172.26.0.0/16
В корне проекта с Symfony создайте папочку docker со следующей структурой:
docker
├── data
│ ├── mysql
│ └── .gitignore
├── dovecot
│ ├── config
│ │ └── dovecot.conf
│ └── Dockerfile
├── nginx
│ └── nginx.conf
├── php-fpm
│ └── Dockerfile
├── postfix
│ ├── config
│ │ └── main.cf
│ ├── scripts
│ │ └── mail_queue.sh
│ └── Dockerfile
├── rabbitmq
│ ├── Dockerfile
│ └── rabbitmq_delayed_message_exchange-3.10.2.ez
├── tmp
└── docker-compose.yml
Makefile
По этой ссылке вы можете скачать все эти файлы в zip архиве.
Распакуйте zip архив в корень Symfony проекта и переместите Makefile из папки docker в корень проекта. Это можно сделать командой
mv ./docker/Makefile ./Makefile
Makefile содержит самые необходимые команды для управления докером:
make build - билд образов
make rebuild - ребилд образов без кэша
make up - подъем докера
make up-rebuild - подъем докера с ребилдом образов
make up-alone - подъем докера с тушением всех других, что не объявлены в yml
make down - остановить все контейнеры
make restart - рестарт всех контейнеров
make down-all - остановить все контейнеры включая не объявленные в yml
make down-v - остановить все контейнеры с удалением данных
make connect-to-php-fpm - подключение внутрь php-контейнера
make drop-database - удаление БД SF
make migrations - прогон миграций SF
make migration-diff - создание миграции через diff SF
make cache-clear - очистка кэша без прогрева SF
make cache-warmup - прогрев кэша SF
make clear-logs - очистка логов SF
make test - запуск автотестов codeception
make help - вызов справки
Прежде чем запускать докер проверьте, что у вас свободны указанные порты и имена контейнеров не конфликтуют с другими запущенными сборками. После этого можно в корне проекта запустить команду make up-rebuild - в этот момент скачаются образы и запустится докер.
Теперь можно перейти к установке бандла для кролика и его настройке. Мы будем использовать бандл php-amqplib/rabbitmq-bundle, так как он проверен на бою, показал свою эффективность, его часто обновляют и поддерживают в актуальном состоянии, он прост в настройке, а так же у него отличная документация, которая позволит настроить всё максимально детально. На момент написания статьи актуальная версия бандла 2.13.1.
Устанавливаем бандл командой
composer require php-amqplib/rabbitmq-bundle
После этого у нас появится конфигурационный файл config/packages/old_sound_rabbit_mq.yaml, где есть базовая часть настроек, а так же список продюсеров и конзумеров.
old_sound_rabbit_mq:
connections:
default:
host: '%env(resolve:RABBITMQ_HOST)%'
port: '%env(resolve:RABBITMQ_PORT)%'
user: '%env(resolve:RABBITMQ_USER)%'
password: '%env(resolve:RABBITMQ_PASSWORD)%'
vhost: '%env(resolve:RABBITMQ_VHOST)%'
lazy: true
connection_timeout: 60
read_write_timeout: 60
keepalive: true
heartbeat: 0
use_socket: false
producers:
process_sms_notification:
connection: default
exchange_options:
name: 'process_sms_notification'
type: 'x-delayed-message'
passive: false
durable: true
auto_delete: false
internal: false
nowait: false
declare: true
ticket: null
arguments:
x-delayed-type: [ 'S', 'direct' ]
x-max-length: [ 'I', '1000000' ]
queue_options:
name: 'process_sms_notification'
passive: false
durable: true
exclusive: false
auto_delete: false
nowait: false
declare: true
ticket: null
arguments:
x-delayed-type: [ 'S', 'direct' ]
x-max-length: [ 'I', '1000000' ]
consumers:
process_sms_notification:
connection: default
exchange_options:
name: 'process_sms_notification'
type: 'x-delayed-message'
passive: false
durable: true
auto_delete: false
internal: false
nowait: false
declare: true
ticket: null
arguments:
x-delayed-type: [ 'S', 'direct' ]
x-max-length: [ 'I', '1000000' ]
queue_options:
name: 'process_sms_notification'
arguments:
x-max-length: [ 'I', '1000000' ]
callback: process_sms_notification_consumer
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
graceful_max_execution:
timeout: 1800
exit_code: 0
Выше описан пример с настройками. В данной статье я не буду расписывать каждый параметр по отдельности - про них можно почитать на официальном сайте rabbitmq в разделе настроек.
Помимо вышеуказанных настроек в файле бандла вам нужно будет объявить сервис process_sms_notification_consumer, который будет как раз вашим конзумером, а также параметр, который будет у нас в базовом конзумере. Сделать это можно в config/services.yaml следующим образом:
parameters:
commonBaseConsumerMaxAttempts: '%env(int:PLATFORM_BASE_CONSUMER_MAX_ATTEMPTS)%'
services:
_defaults:
autowire: true
autoconfigure: true
public: true
process_sms_notification_consumer:
class: App\Consumer\ProcessSmsNotificationConsumer
autowire: true
Теперь нам нужно создать BaseConsumer и сам конзумер для смс нотификации.
У нас будет App\Consumer\BaseConsumer, располагающийся в src/Consumer/BaseConsumer.php
<?php
namespace App\Consumer;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Exception\EntityManagerClosed;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Throwable;
abstract class BaseConsumer
{
public function __construct(
protected EntityManagerInterface $em,
protected LoggerInterface $logger,
protected int $commonBaseConsumerMaxAttempts
) {
}
abstract protected function process(array $data): void;
abstract protected function getDelayedProducer(): ProducerInterface;
protected function getDelayWhenError(): int
{
return 1000 * 60 * 5;
}
public static function getDelayMessageHeaders(int $seconds = 0, int $minutes = 0, int $hours = 0): array
{
$time = 0;
$time += 1000 * $seconds;
$time += 1000 * 60 * $minutes;
$time += 1000 * 60 * 60 * $hours;
return ['x-delay' => $time];
}
public function execute(AMQPMessage $msg): void
{
$data = json_decode($msg->getBody(), true);
try {
$this->em->getConnection()->connect();
$this->process($data);
} catch (Throwable $e) {
$this->catchExceptionAction($data, $e);
}
unset($data);
$this->em->clear();
$this->em->getConnection()->close();
gc_collect_cycles();
gc_mem_caches();
}
private function catchExceptionAction(array $data, Throwable $e): void
{
$log = sprintf(
'Class = %s; Line = %s; ErrorMessage = %s;' . PHP_EOL . 'ErrorTrace: %s',
$e->getFile(),
$e->getLine(),
$e->getMessage(),
$e->getTraceAsString()
);
if (!isset($data['attempt'])) {
$data['attempt'] = 1;
}
$log .= ' attempt=' . $data['attempt'];
$data['attempt'] = $data['attempt'] + 1;
if (!$e instanceof EntityManagerClosed) {
$this->logger->warning($log);
}
if ($data['attempt'] < $this->commonBaseConsumerMaxAttempts) {
$this->getDelayedProducer()->publish(msgBody: json_encode($data), headers: ['x-delay' => $this->getDelayWhenError()]);
} else {
$this->logger->critical($log . ' Attempts ended.');
}
}
}
И теперь создаем наш конзумер для обработки SMS-сообщений App\Consumer\ProcessSmsNotificationConsumer в файле src/Consumer/ProcessSmsNotificationConsumer.php
<?php
namespace App\Integration\AliExpress\Consumer;
use App\Consumer\BaseConsumer;
use Doctrine\ORM\EntityManagerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
class ProcessSmsNotificationConsumer extends BaseConsumer
{
public function __construct(
EntityManagerInterface $em,
LoggerInterface $logger,
#[Autowire('%commonBaseConsumerMaxAttempts%')] int $commonBaseConsumerMaxAttempts,
private readonly ProducerInterface $processSmsNotificationProducer,
) {
parent::__construct($em, $logger, $commonBaseConsumerMaxAttempts);
}
protected function process(array $data): void
{
// реализуйте тут логику
// в массиве $data будет лежать то, что было передано в сообщении
// здесь вы можете выбросить исключение и за счет
// логики в базовом контроллере вы сможете
// перепоставить задачку в очередь снова
}
protected function getDelayedProducer(): ProducerInterface
{
return $this->processSmsNotificationProducer;
}
}
Так же вам нужно будет добавить новые переменные в .env файл
...
RABBITMQ_HOST='project-name-rabbit-mq'
RABBITMQ_PORT='5672'
RABBITMQ_USER='user'
RABBITMQ_PASSWORD='password'
RABBITMQ_VHOST='/'
PLATFORM_BASE_CONSUMER_MAX_ATTEMPTS=2
...
Теперь мы можем создать для примера консольную команду, которая будет отправлять задания в очередь кролика:
<?php
namespace App\Command;
use App\Consumer\BaseConsumer;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Attribute\AsCommand;
#[AsCommand(name: 'test:test')]
class TestCommand extends Command
{
public function __construct(
private readonly ProducerInterface $processSmsNotificationProducer,
) {
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->processSmsNotificationProducer->publish(
json_encode([
'tel_number' => '+7 999 999 99 99',
'sms_text' => 'some awesome text here',
]),
headers: BaseConsumer::getDelayMessageHeaders(minutes: 5)
);
return 0;
}
}
Выше описана консольная команда, которая в своем теле отправляет сообщение в очередь кролика в отложенную на 5 минут очередь. Если вам не нужна отложка - то можно просто не передавать данные заголовки и оставить второй параметр пустым. В этом случае сообщение будет доставлено в очередь сразу же.
Чтобы запусктить и проверить работоспособность нужно в консоли php-fpm контейнера докера запустить в одном окне команду
bin/console rabbitmq:consumer process_sms_notification
А в соседней консоли запустить
bin/console test:test
На этом ваш кролик готов к функционированию. Вы можете создавать столько очередей, сколько позволит вам железо вашего сервера. В проде вам нужно будет поставить команду запуска очереди ( bin/console rabbitmq:consumer ) например под супервизор или под номад. Эта команда будет работать как демон и обрабатывать ваши очереди.
У кролика также и локально и в проде есть интерфейс для мониторинга очередей и их настройки, который локально будет доступен по урлу http://localhost:23672/ с пользователем по-умолчанию guest:guest
Я намеренно не использую в паре с кроликом CQRS от симфони, потому что считаю выше-описанный подход более удобным в такой вот конфигурации.
Предполагается, что у вас уже есть установленное приложение Symfony 6.3
Эта сборка немного отличается от предыдущей и содержит в себе вот такой вот стек:
-- Symfony 6.3
-- Xdebug
-- php 8.1
-- Nginx
-- mysql percona 8
-- kafka
-- memcached
-- mailcatcher
-- postfix
-- dovecot
Для такой сборки файл docker-compose.yml будет выглядеть следующим образом:
version: '3.1'
services:
template-project-php-fpm:
build:
context: ./php-fpm
dockerfile: Dockerfile
args:
XDEBUG_CLIENT_HOST: 172.59.0.1
XDEBUG_CLIENT_PORT: 9000
image: php-fpm
restart: always
volumes:
- ../:/app
- ./tmp:/tmp/tmp:rw
working_dir: /app
container_name: 'template-project-php-fpm'
hostname: 'localhost'
networks:
template-project:
ipv4_address: 172.59.0.3
environment:
XDEBUG_CONFIG: client_host=172.59.0.1 client_port=9000 remote_enable=1
PHP_IDE_CONFIG: serverName=localhost
DEV: 1
template-project-nginx:
image: nginx:1.15.0
container_name: 'template-project-nginx'
working_dir: /app
ports:
- '28080:80'
- '28081:81'
networks:
template-project:
ipv4_address: 172.59.0.4
restart: always
volumes:
- ../:/app
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
template-project-db:
container_name: template-project-db
image: percona:8
user: root
restart: always
command: ['mysqld', '--user=root', "--sql-mode=STRICT_TRANS_TABLES,ALLOW_INVALID_DATES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION"]
environment:
MYSQL_DATABASE: template-project
MYSQL_ROOT_PASSWORD: pass
ports:
- '28082:3306'
volumes:
- ../:/app
- ./data/mysql:/var/lib/mysql:rw
networks:
template-project:
ipv4_address: 172.59.0.5
template-project-memcached:
container_name: template-project-memcached
image: memcached:latest
command: '-c 8192 -m 8192'
networks:
template-project:
ipv4_address: 172.59.0.17
aliases:
- memcached.docker
template-project-kafka:
image: docker.io/bitnami/kafka:3.6
ports:
- "28092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
networks:
template-project:
ipv4_address: 172.59.0.15
aliases:
- kafka
template-project-mailcatcher:
container_name: template-project-mailcatcher
image: schickling/mailcatcher
networks:
template-project:
ipv4_address: 172.59.0.14
aliases:
- mailcatcher
template-project-postfix:
container_name: template-project-postfix
build: ./postfix
depends_on:
- template-project-dovecot
environment:
smtp_user: user:password
volumes:
- ./postfix/scripts:/root/postfix:rw
- ./postfix/config/main.cf:/etc/postfix/main.cf
- template-project-postfix-dovecot:/shared
ports:
- '28025:25'
networks:
template-project:
ipv4_address: 172.59.0.13
aliases:
- mail.template-project.docker
template-project-dovecot:
container_name: template-project-dovecot
build: ./dovecot
volumes:
- ./dovecot/config/dovecot.conf:/etc/dovecot/dovecot.conf
- template-project-postfix-dovecot:/shared
ports:
- '28110:110'
- '28143:143'
networks:
template-project:
ipv4_address: 172.59.0.12
aliases:
- dovecot.docker
volumes:
mysql:
template-project-data: {}
template-project-postfix-dovecot: {}
kafka_data:
driver: local
networks:
template-project:
driver: bridge
ipam:
config:
- subnet: 172.59.0.0/20
В корне проекта с Symfony создайте папочку со следующей структурой:
docker
├── data
│ ├── mysql
│ └── .gitignore
├── dovecot
│ ├── config
│ │ └── dovecot.conf
│ └── Dockerfile
├── nginx
│ └── nginx.conf
├── php-fpm
│ └── Dockerfile
├── postfix
│ ├── config
│ │ └── main.cf
│ ├── scripts
│ │ └── mail_queue.sh
│ └── Dockerfile
├── tmp
└── docker-compose.yml
Makefile
По этой ссылке вы можете скачать все эти файлы в zip архиве.
Распакуйте zip архив в корень Symfony проекта и переместите Makefile из папки docker в корень проекта. Это можно сделать командой
mv ./docker/Makefile ./Makefile
Makefile содержит самые необходимые команды для управления докером:
make build - билд образов
make rebuild - ребилд образов без кэша
make up - подъем докера
make up-rebuild - подъем докера с ребилдом образов
make up-alone - подъем докера с тушением всех других, что не объявлены в yml
make down - остановить все контейнеры
make restart - рестарт всех контейнеров
make down-all - остановить все контейнеры включая не объявленные в yml
make down-v - остановить все контейнеры с удалением данных
make connect-to-php-fpm - подключение внутрь php-контейнера
make drop-database - удаление БД SF
make migrations - прогон миграций SF
make migration-diff - создание миграции через diff SF
make cache-clear - очистка кэша без прогрева SF
make cache-warmup - прогрев кэша SF
make clear-logs - очистка логов SF
make test - запуск автотестов codeception
make help - вызов справки
make kafka-create-topic - создает топик на сервере кафки для работы очереди
Прежде чем запускать докер проверьте, что у вас свободны указанные порты и имена контейнеров не конфликтуют с другими запущенными сборками. После этого можно в корне проекта запустить команду make up-rebuild - в этот момент скачаются образы и запустится докер.
Теперь можно перейти к установке бандла для кролика и его настройке. Мы будем использовать бандл koco/messenger-kafka, так как он единственный, который я нашел с плюс-минус нормальной документацией, правда без примеров и все пришлось расколупывать на собственном опыте. На момент написания статьи актуальная версия бандла 0.17.
Устанавливаем бандл командой
composer require koco/messenger-kafka
После этого у нас появится конфигурационный файл config/packages/messnger.yaml. В этом файле нам надо будет внести следующие настройки:
framework:
messenger:
transports:
producer:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Infrastructure\Messenger\MySerializer
options:
flushTimeout: 10000
flushRetries: 5
topic:
name: 'messages'
kafka_conf:
log_level: '7'
metadata.broker.list: '%env(KAFKA_BROKERS)%'
message.send.max.retries: '5'
#security.protocol: 'sasl_ssl'
#ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
#sasl.username: '%env(KAFKA_SASL_USERNAME)%'
#sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
#sasl.mechanisms: 'SCRAM-SHA-256'
consumer:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: App\Infrastructure\Messenger\MySerializer
options:
commitAsync: true
receiveTimeout: 10000
topic:
name: 'messages'
kafka_conf:
enable.auto.offset.store: 'false'
group.id: 'my-group-id'
log_level: '7'
enable.auto.commit: 'true'
metadata.broker.list: '%env(KAFKA_BROKERS)%'
security.protocol: 'plaintext' # plaintext, ssl, sasl_plaintext, sasl_ssl
auto.offset.reset: 'earliest' # 'earliest': start from the beginning
enable.partition.eof: 'true' # Emit EOF event when reaching the end of a partition
#security.protocol: 'sasl_ssl'
#ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
#sasl.username: '%env(KAFKA_SASL_USERNAME)%'
#sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
#sasl.mechanisms: 'SCRAM-SHA-256'
max.poll.interval.ms: '45000'
topic_conf:
auto.offset.reset: 'earliest'
routing:
# Route your messages to the transports
'App\Message\SmsNotification': consumer
# when@test:
# framework:
# messenger:
# transports:
# # replace with your transport name here (e.g., my_transport: 'in-memory://')
# # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test
# async: 'in-memory://'
Следующим шагом нужно создать переменные в .env файле:
...
MESSENGER_TRANSPORT_DSN=kafka://
KAFKA_BROKERS=template-project-kafka:9092
...
Строить работу кафки мы будем на базе CQRS Symfony, то есть разделять отправку сообщений и реализацию через хэндлер.
Нам нужно создать 3 класса:
-- App\Infrastructure\Messenger\MySerializer
-- App\Message\SmsNotification
-- App\MessageHandler\SmsNotificationHandler
<?php
namespace App\Infrastructure\Messenger;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
final class MySerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$record = json_decode($encodedEnvelope['body'], true);
return new Envelope(new SmsNotification(
$record['id'],
$record['name'],
$record['description'],
));
}
public function encode(Envelope $envelope): array
{
/** var ProductCreated $event */
$event = $envelope->getMessage();
return [
'key' => $event->getId(),
'headers' => [],
'body' => json_encode([
'id' => $event->getId(),
'name' => $event->getName(),
'description' => $event->getDescription(),
]),
];
}
}
<?php
namespace App\Message;
class SmsNotification
{
public function __construct(
private readonly string $id,
private readonly string $name,
private readonly string $description,
) {
}
public function getId(): string
{
return $this->id;
}
public function getName(): string
{
return $this->name;
}
public function getDescription(): string
{
return $this->description;
}
}
<?php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __construct(
//private readonly LoggerInterface $logger
) {
}
public function __invoke(SmsNotification $message)
{
sleep(2);
//$this->logger->emergency('FUCK YEAH IT IS WORKING!!! MAZATRUKKA!!!');
file_put_contents('/app/testFile.txt', $message->getId().PHP_EOL, FILE_APPEND);
// ... do some work - like sending an SMS message!
}
}
По факту SmsNotification - это и есть наше сообщение, а SmsNotificationHandler - это наш конзумер. Сериалайзер позволяет каким-либо образом дополнительно форматировать сообщения в случае, если это необходимо.
Также нам нужно немного отредактировать конфигурацию исключений автоматического подтягивания сервисов в config/services.yaml, в противном случае наш класс SmsNotification будет восприниматься Symfony как сервис и она будет пытаться его автоматически сконфигурировать и требовать от вас передать параметры в его конструктор.
parameters:
services:
_defaults:
autowire: true
autoconfigure: true
public: true
App\:
resource: '../src/'
exclude:
- '../src/DependencyInjection/'
- '../src/Entity/'
- '../src/Message/' # !!! нужно добавить вот эту вот строчку !!!
- '../src/Kernel.php'
Также нам потребуется где-то прописать запуск сообщения в очередь. Давайте снова создадим консольную команду под эти нужды:
<?php
namespace App\Command;
use App\Message\SmsNotification;
use Exception;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsCommand(name: 'test:test')]
class Test extends Command
{
public function __construct(
private readonly MessageBusInterface $messageBus,
string $name = null,
) {
parent::__construct($name);
}
/**
* @throws Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->testKafka();
return Command::SUCCESS;
}
private function testKafka()
{
for ($i = 0; $i <= 100; $i++) {
$this->messageBus->dispatch(new SmsNotification($i, 'some name', 'some description'));
}
}
}
Теперь можем попробовать запустить нашу схему и проверить работоспособность. Делаем это следующим образом:
Для начала почистим кэш
make cache-clear && make cache-warmup
Если вы еще не создавали топик для кафки, то нужно запустить команду
make kafka-create-topic
В случае, если топик уже существует, то может отрисовать ошибку - в этом нет ничего страшного.
Заходим внутрь контейнера php-fpm docker ( нужно два терминала )
make php-fpm-connect
В одном терминале внутри контейнера докера запускаем команду
bin/console messenger:consume consumer
Команда в целом не должна выдавать никаких ошибок. Если начала выдавать - попробуйте выполнить make up-rebuild и запустить процесс заново.
Во втором терминале запустите команду
bin/console test:test
Команда отработает и у вас в корне проекта должен появиться файлик testFile.txt в который будут писаться номера обработанных сообщений. Если пишутся - значит всё работает.
По кафке могут сыпаться некоторые ошибки конфигураций, что какие-то конфигурационные значения принадлежат не продюсеру, а конзумеру - в этом нет ничего страшного, просто сейчас этот бандл так работает. Насколько я могу судить автор готовит версию бандла 1.0.* - возможно там будут какие-то новые варианты настройки бандла и эти ошибки пропадут.
Мы разобрались как создать локалку на Symfony с кафкой и кроликом. Надеюсь материалы будут вам полезны для осознанного выбора, а так же более быстрой развертки, чтобы не повторять путь проб и ошибок ( множества ошибок ).
Подписывайтесь на мой тг-канал.
Ура! Я наконец-то дописал статью как собирать собственные бандлы на Symfony 6!!!
Статья про EasyAdmin всё ещё в процессе )))
Не, ну мне же надо на чем-то тестировать твиттер локальный...
Я тут еще много полезного буду выкладывать, так что заходите обязательно почитать.
Сайтик пока что в разработке - это далеко не окончательная версия - по сути это то что удалось слепить за 8 часов.
Комментарии