ActiveMQ-сервер

MQ (Message Queue)-сервер необходим для передачи сообщений между различными приложениями - компонентами системы. Он также важен для работы, как и сервер базы данных. В качестве MQ-сервера используется Apache ActiveMQ.

Быстрая установка

  • Для выполнения данного сценария требуются привилегии root или sudo.

  • Данный сценарий не позволяет настраивать параметры установки.

  • Сценарий требует установленных утилит nc, wget, curl, unzip, sed.

  • Мы рекомендуем в сегда проверять сценарии, загруженные из Интернета, прежде чем запускать их локально.

Выполните с привилегиями sudo или root:

curl -fsSL https://raw.githubusercontent.com/bgbilling/images-base/master/install/activemq/5.15.5/activemq.sh activemq.sh
sh -eux activemq.sh

Укажите путь к JDK в параметре JAVA_HOME в файле /opt/activemq/current/bin/env.

Для системы с systemd вызовите (обратите внимание, что сервис будет запускаться от пользователя activemq):

systemctl enable activemq
systemctl start activemq

Для системы с sysvinit отредактируйте файл bin/linux-x86-64/wrapper.conf, укажите правильный путь к бинарному файлу java в параметре wrapper.java.command. Создайте символическую ссылку:

ln -s /opt/activemq/current/bin/linux-x86-64/activemq /etc/init.d/activemq

и вызовите:

service activemq start

Подробная установка

Загрузите ActiveMQ с официального сайта (или установите из репозитария Linux, однако в этом случае пути файлов могут отличаться от путей, указанных в данной главе). Рекомендуемая версия - 5.14.4 или выше, т.к. в биллинге используются клиентские библиотеки 5.14.4.

Linux

  • Убедитесь, что имя сервера с ActiveMQ указано в файле /etc/hosts. Имя сервера можно получить командой uname -n.

  • При распаковке в системе Linux обратите внимание, чтобы tar/zip поддерживал длинные имена файлов (проблема проявлялась в старых дистрибутивах Linux и FreeBSD), иначе распаковка пройдет некорректно и для нормальной работы activeMQ не будет хватать нужных файлов.

Распакуйте архив в каталог /opt/activemq, создайте символическую ссылку, например:

ln -s /opt/activemq/apache-activemq-5.15.5 /opt/activemq/current

systemd

Для системы с systemd вызовите создайте файл activemq.service в каталоге /lib/systemd/system с содержимым:

[Unit]
Description=Apache ActiveMQ
After=network-online.target
 
[Service]
Type=forking
WorkingDirectory=/opt/activemq/current/bin
ExecStart=/opt/activemq/current/bin/activemq start
ExecStop=/opt/activemq/current/bin/activemq stop
Restart=on-abort
RestartSec=60
User=root
Group=root
 
[Install]
WantedBy=multi-user.target

Если вы хотите, чтобы ActiveMQ запускался под пользователем activemq, то создайте пользователя, отредактируйте activemq.service и дайте пользователю права на каталог /opt/activemq/current.

Затем выполните:

systemctl enable activemq
systemctl start activemq

Логи выполнения хранятся в data/activemq.log и data/wrapper.log, по ним можно проследить безаварийный старт сервиса.

sysvinit

Укажите в скрипте запуска /opt/activemq/current/bin/linux/wrapper.conf переменную wrapper.java.command. Например:

# Java Application
wrapper.java.command=/opt/java/jdk/bin/java

Создайте ссылку на службу (init.d).

ln -s /opt/activemq/current/bin/linux-x86-64/activemq /etc/init.d/activemq

Настройте автоматический запуск службы и запустите её. При работе на одной машине с приложениями биллинга служба должна стартовать раньше всех приложений биллинга (регулируется префиксом ссылки).

Логи выполнения хранятся в data/activemq.log и data/wrapper.log, по ним можно проследить безаварийный старт сервиса.

Windows

Настройте системную переменную ACTIVEMQ_HOME, указывающую на каталог установки ActiveMQ.

Перейдите в директорию ACTIVEMQ_HOME/bin/win32. Выполните InstallService.bat. После выполнения в списке служб Windows должна появится служба ActiveMQ.

Логи выполнения хранятся в data/activemq.log и data/wrapper.log, по ним можно проследить безаварийный старт сервиса.

Настройка

Главный конфигурационный файл ActiveMQ, использующийся по умолчанию - conf/activemq.xml. Логин и пароль (те, что указываются в биллинге в mq.user и mq.pswd) расположены в файле conf/credentials.properties. Рекомендуемая конфигурация activemq.xml:

<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
 
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
 
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
 
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" optimizedDispatch="true">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
 
http://activemq.apache.org/slow-consumer-handling.html
 
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="50000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
 
 
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
 
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
 
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
 
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
<!-- <kahaDB directory="${activemq.data}/kahadb" journalDiskSyncStrategy="periodic"/> -->
</persistenceAdapter>
 
<plugins>
<!-- drop messages that have been sent to the DLQ -->
<discardingDLQBrokerPlugin dropAll="true"/>
 
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="user"/>
<authenticationUser username="listener" password="listener" groups="anonymous"/>
</users>
</simpleAuthenticationPlugin>
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue="BG.Event.>" read="user" write="user" admin="user" />
<authorizationEntry topic="BG.Event.>" read="user, anonymous" write="user" admin="user" />
 
<authorizationEntry topic="ActiveMQ.Advisory.>" read="user, anonymous" write="user, anonymous" admin="user, anonymous"/>
</authorizationEntries>
 
<!-- let's assign roles to temporary destinations. comment this entry if we don't want any roles assigned to temp destinations -->
<!--
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
</tempDestinationAuthorizationEntry>
-->
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
 
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="256 mb"/>
<!--<memoryUsage percentOfJvmHeap="70" /> -->
</memoryUsage>
<storeUsage>
<storeUsage limit="10 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="1 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
 
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
 
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://127.0.0.1:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
 
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
 
</broker>
 
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
 
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
 
</beans>

Обратите внимание на строчку:

<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="user"/>

Значения ${activemq.username} и ${activemq.password} являются "ссылками" на значения из файла credentials.properties и используются для аутентификации пользователя MQ (в нашем случае пользователь MQ - это биллинг).

В последней строчке (<import resource="jetty.xml"/>) включается веб-консоль activeMQ, при этом в jetty.xml по умолчанию указан хост "0.0.0.0". Рекомендуется поменять значение на 127.0.0.1, даже если строка (<import resource="jetty.xml"/>) закомментирована (т.е. даже если веб-консоль отключена - на случай, если понадобиться её включить):

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>
<property name="port" value="8161"/>
</bean>

Также рекомендуется использовать фаервол, разрешая доступ только для необходимых портов с необходимых сетей.

В ветке plugins указан параметр, при котором все сообщения, у которых истек timeToLive будут удаляться (по умолчанию они переносятся в очередь ActiveMQ.DLQ):

<!-- drop messages that have been sent to the DLQ -->
<discardingDLQBrokerPlugin dropAll="true"/>

Ниже описывается использование системных ресурсов для NON_PERSISTENT, PERSISTENT-сообщений и временных очередей. При превышении данных ресурсов отправка сообщений будет замедлена:

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="256 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="10 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="1 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>

В этом отрывке указывается тип коннектора для работы с сервером, интерфейс и порт:

<transportConnectors>
<transportConnector name="openwire" uri="tcp://127.0.0.1:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

К этому порту будут подключаться к серверу MQ приложения биллинговой системы. Если все компоненты биллинга установлены на одном сервере, то можно оставить значение uri=tcp://127.0.0.1:61616. Иначе нужно указать IP-адрес интерфейса, на который будут идти подключения или установить uri=tcp://0.0.0.0:61616, чтобы порт был открыт на всех интерфейсах.

Параметры подключения к серверу ActiveMQ указываются в каждом серверном приложении в .properties файле, например в data/data.properties для сервера биллинга.

mq.url=failover:(nio://127.0.0.1:61616)
mq.user=bill
mq.pswd=bgbilling

Значения mq.user и mq.pswd должны совпадать со значениями из conf/credentials.properties ActiveMQ.

Для локальной машины mq.url=failover:(nio://127.0.0.1:61616), для нескольких серверов (должна быть настроена поддержка сети серверов в каждом из MQ-серверов):

mq.url=failover:(tcp://mq1.core.provider.org:61616,tcp://mq1.core.provider.org:61616)

В последнем случае подключение будет к случайному из списка, если подключение невозможно - идет попытка подключения к следующему указанному серверу MQ, и так пока не установится подключение. Если второй сервер играет роль "запасного" - например, он установлен на слабой машине и должен принять работу только, если прервется работа первого сервера, то можно указать, чтобы подключение не устанавливалось к случайному, а попытки шли в указанном порядке:

mq.url=failover:(tcp://mq1.core.provider.org:61616,tcp://mq1.core.provider.org:61616)?randomize=false