Для того, чтобы принимать и обрабатывать flow-пакеты (netflow/netflow v9/sflow) в
должен быть прописан соответствующий слушатель. Если необходимо приходящие пакеты перед обработкой сохранять на диск, в слушатель нужно передать dataLogger. Объект dataLogger описывается один раз и передается всем flow-слушателям, с которых нужно сохранять данные.У слушателя необходимо указать параметры:
type - его тип: netflow, netflow9, sflow; |
host - хост (интерфейс), на котором будет открыт сокет, по умолчанию на всех интерфейсах; |
port - порт, на котором будет открыт сокет; |
recvBufferSize - размер буфера приема слушателя; |
soRCVBUF - рекомендуемый размер буфера приема сокета (SO_RCVBUF); |
threadCount - количество потоков-обработчиков; |
agentDeviceIds - id устройств, источников flow-потоков, если на данном порту нужно обрабатывать данные только у определенных источников; |
dataLogger - dataLogger, с помощью которого приходящие пакеты будут записываться в лог файлы. |
<!-- Служебный ScheduledExecutorService, необходимый для dataLogger --> <scheduledExecutorService name="hrlydtlggr" corePoolSize="1"/> <!-- Cоздание dataLogger, сохраняющего flow-пакеты на диск (только один экземпляр) --> <bean name="flowDataLogger" class="ru.bitel.bgbilling.modules.inet.collector.IPHourlyDataLogger"> <param name="scheduledExecutor">hrlydtlggr</param> </bean> <!-- Cоздание слушателя flow-пакетов на порту с передачей ему dataLogger --> <bean name="flowListener" class="ru.bitel.bgbilling.modules.inet.collector.InetFlowListener"> <constructor factoryMethod="newInstance"> <!-- Тип слушателя, netflow, netflow9 или sflow --> <param name="type" value="netflow"/> <!-- Хост (интерфейс), на котором будет открыт сокет. Если пусто - на всех --> <param name="host" value=""/> <!-- Порт, на котором будет открыт сокет --> <param name="port" value="2001"/> <!-- Размер буфера приема слушателя --> <param name="recvBufferSize">4 * 1024 * 1024</param> <!-- Рекомендуемый SO_RCVBUF сокета --> <param name="soRCVBUF">512 * 1024</param> <!-- Количество потоков-обработчиков --> <param name="threadCount" value="10"/> <!-- id устройств-источников, если на данном порту нужно получать пакеты только c определенных источников --> <param name="agentDeviceIds" value=""/> <!-- id устройств-источников, если на данном порту нужно обрабатывать пакеты только c определенных источников --> <param name="processAgentDeviceIds" value=""/> <!-- 1 - если нужно запретить сохранять и обрабатывать пакеты, в которых нет записей с IP-адресами из IP-ресурсов, 2 - с учетом периодов IP-ресурсов --> <param name="ipResourceFilter" value=""/> <!-- Передача dataLogger --> <param name="dataLogger">flowDataLogger</param> </constructor> </bean>
Каждый слушатель использует буферы памяти в direct memory, которая не находится в обычном heap java, общий размер которых можно расчитать как recvBufferSize (+ recvBufferSize) + threadCount * datalog.flow.chunk.size(datalog.chunk.size). Direct memory может использоваться также другими слушателями, а также при обработке логов. При превышении доступной памяти произойдет ошибка java.lang.OutOfMemoryError: Direct buffer memory. В старых билдах JRE MaxDirectMemorySize по умолчанию ограничен 64МБ, в новых - ограничен размером свободной оперативной памяти. Для того, чтобы указать конкретное значение, используется параметр в строке запуска -XX:MaxDirectMemorySize.
Обычно достаточно оставить параметр agentDeviceIds пустым, что будет означать, что поток будет приниматься со всех устройств-источников, начиная от корневого устройства Accounting-сервера. Однако в ситуации, когда с одного IP-адреса приходит два потока с разных источников, чтобы Accounting корректно разделял данные на два источника, необходимо создать два устройства-источника с одним и тем же IP-адресом и добавить два слушателя на двух разных портах и в agentDeviceIds одного прописать первый источник, во втором - второй. Таким образом трафик, идущий с одного и того же адреса на один порт будет считаться как трафик с одного источника, на другой порт - с другого источника.
В конфигурации типа устройства-источника (или в конфигурации устройства) необходимо указать какой поток идет с источника:
#Тип источника, netflow, netflow9 или sflow #по умолчанию - netflow flow.agent.type=netflow
Когда сессия стартует на каком-либо устройстве, по умолчанию считается, что это устройство и есть источник flow потока для данной сессии. Например, когда устройство одновременно является NAS'ом и источником netflow.
В схеме, когда flow-сенсор находится выше, необходимо указать для каждого устройства (например, NAS'а) привязку к источнику. Например, источник - это устройство с кодом 3. В конфигурации устройства-NAS'а прописываем, что трафик с любого интерфейса источника 3 может принадлежать сессии с данного NAS'а:
flow.agent.link=3:-1
Или, трафик с интерфейсов 1 или 2 может принадлежать сессии с данного NAS'а. Т.е. flow пакеты даже с таким же IP-адресом, как у сессии, но с другим интерфейсом, привязаны к ней не будут.
flow.agent.link=3:1,2
Для того, чтобы указать, что устройство является источником для всех своих дочерних устройств, достаточно в конфигурации типа устройства-источника прописать:
flow.agent.link={@deviceId}:-1
При использовании InetFlowListener возможна схема, когда будет множество устройств-коммутаторов с интерфейсами, которые никак не связаны с Flow-агентом и поэтому у них не указан в конфигурации flow.agent.link. Исторически сложилось, что если параметр flow.agent.link не указан, то устройство само считается Flow-агентом с указанными интерфейсами. Для экономии памяти рекомендуется указать для таких устройств flow.agent.link={@deviceId}:-1.
Если необходимо исключить из сохранения и обработки пакеты, в которых отсутствуют записи с IP-адресами, принадлежащими одному из заведенных диапазонов IP-ресурсов, в inet-accounting.xml для слушателя нужно указать параметр <param name="ipResourceFilter" value="1"/>, или же глобально (т.е. для всех слушателей) - в конфигурации модуля:
# Фильтр flow пакетов: # 0 (по умолчанию) - не фильтровать, 1 - не сохранять и не обрабатывать пакеты, в которых нет записей с IP-адресами из IP-ресурсов, # 2 - то же самое, что 1, но с учетом периодов IP-ресурсов. flow.ipResourceFilter=1