Общая схема:
host-sflow -> sFlow -> goflow2 collector -> kafka protobuf -> clickhouse
1. Серверная часть сборки sflow
https://github.com/netsampler/goflow2
NetFlow/IPFIX/sFlow collector in Go
Kafka
Подготовить топик nflow
goflow2
Запущен в Docker
docker run --net=host -ti netsampler/goflow2:latest -transport=kafka -transport.kafka.brokers=localhost:9092 -transport.kafka.topic=nflow -format=pb -format.protobuf.fixedlen=true
Clickhouse
Скачать настройки протокола protobuf
/var/lib/clickhouse/user_files/protocols.csv
Таблицы и подключение к kafka
CREATE TABLE IF NOT EXISTS nflow.flows_kafka(time_received UInt64,time_flow_start UInt64,sequence_num UInt32,sampling_rate UInt64,sampler_address FixedString(16),src_addr FixedString(16),dst_addr FixedString(16),src_as UInt32,dst_as UInt32,etype UInt32,proto UInt32,src_port UInt32,dst_port UInt32,bytes UInt64,packets UInt64) ENGINE = Kafka()SETTINGSkafka_broker_list = '127.0.0.1:9092',kafka_topic_list = 'nflow',kafka_group_name = 'clickhouse',kafka_format = 'Protobuf',kafka_schema = 'flow.proto:FlowMessage',kafka_skip_broken_messages = 1;CREATE TABLE IF NOT EXISTS nflow.flows_raw(date Date,datetime DateTime,sequence_num UInt32,sampling_rate UInt64,sampler_address String,src_addr String,dst_addr String,src_as UInt32,dst_as UInt32,etype UInt32,proto UInt32,src_port UInt32,dst_port UInt32,bytes UInt64,packets UInt64) ENGINE = MergeTree()PARTITION BY dateORDER BY datetimeTTL date + toIntervalDay(30)SETTINGS index_granularity = 8192;CREATE MATERIALIZED VIEW IF NOT EXISTS nflow.flows_raw_mv TO nflow.flows_rawAS SELECTtoDate(time_received) AS date,time_flow_start AS datetime,sequence_num,sampling_rate,IPv4NumToString(reinterpretAsUInt32(substring(reverse(sampler_address), 13, 4))) AS sampler_address,IPv4NumToString(reinterpretAsUInt32(substring(reverse(src_addr), 13, 4))) AS src_addr,IPv4NumToString(reinterpretAsUInt32(substring(reverse(dst_addr), 13, 4))) AS dst_addr,src_as,dst_as,etype,proto,src_port,dst_port,bytes,packetsFROM nflow.flows_kafka;
2. Установка сборщика флоу на хост
Используем host-sflow
cat /etc/hsflowd.conf
sflow {
collector { ip=goflow2.host.example.com udpport=6343 }
# ====== Local configuration ======
pcap { dev = eth0 }
pcap { dev = eth1 }
# ...
}
systemctl enable hsflowd
systemctl start hsflowd
3. Примеры запросов
Активные сборщики флоу
select sampler_address,count() from nflow.flows_raw where datetime>now()-24*3600 group by sampler_address limit 100;
Найти источник запросов на список внешних ресурсов
select src_addr,count() from nflow.flows_raw where datetime>now()-24*3600 and dst_addr in ('1.1.1.1','8.8.8.8') group by src_addr order by count() desc limit 100;