Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
flume
flume其实就是一个分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去。也可以对数据进行简单的处理。
Evnet
event是将从数据源(source)收集过来,再缓存(channel)传输到目的地(sink)的数据进行封装,是flume传输数据的基本单位。传输流程为event从source,流向channel,再到sink,一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录)
Client
Client 是一个将原始log包装成events并且发送它们到一个或者多个agent的实体。Client 在Flume的拓扑结构中不是必须的,它的目的是从数据源系统中解耦Flume。
Agent:
一个独立的Flume进程,包含组件Source、Channel、Sink。它利用这些组件将events从一个节点传输到另一个节点或最终目的地。agent是Flume流的基础部分。
- Source:用来消费传递到该组件的Event,负责接收event或通过特殊机制产生event,并将events批量的放到一个或多个Channel。 它包含event驱动和轮询两种类型。且必须至少和一个Channel关联。类型有Syslog、NetCat、Exec、Avro、Thrift。
Source类型 | 说明 |
---|---|
Avro Source | 支持Avro协议(实际上是Avro RPC),内置支持 |
Thrift Source | 支持Thrift协议,内置支持 |
Exec Source | 基于Unix的command在标准输出上生产数据 |
JMS Source | 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 |
Spooling Directory Source | 监控指定目录内数据变更 |
Twitter 1% firehose Source | 通过API持续下载Twitter数据,试验性质 |
Netcat Source | 监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
Sequence Generator Source | 序列生成器数据源,生产序列数据 |
Syslog Sources | 读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
- Channel:Channel位于Source和Sink之间,用于缓存进来的event。当Sink成功的将event发送到下一跳的Channel或最终目的地,event才Channel中移除。不同的Channel提供的持久化水平也是不一样的,类型主要有Memory Channel、JDBC Channel、File Channel,可以和任何数量的Source和Sink工作。
Channel类型 | 说明 |
---|---|
Memory Channel | Event数据存储在内存中 |
JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel | Event数据存储在磁盘文件中 |
Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel | 测试用途 |
Custom Channel | 自定义Channel实现 |
- Sink :Sink负责将event传输到下一跳或最终目的,成功完成后将event从Channel移除。 它也有不同类型的Sink:存储event到最终目的的终端Sink。比如HDFS,HBase;自动消耗的Sink。比如:Null Sink;用于Agent间通信的IPC sink:Avro。Sink必须作用于一个确切的Channel。
Sink类型 | 说明 |
---|---|
HDFS Sink | 数据写入HDFS |
Logger Sink | 数据写入日志文件 |
Avro Sink | 数据被转换成Avro Event,然后发送到配置的RPC端口上 |
Thrift Sink | 数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
IRC Sink | 数据在IRC上进行回放 |
File Roll Sink | 存储数据到本地文件系统 |
Null Sink | 丢弃到所有数据 |
HBase Sink | 数据写入HBase数据库 |
Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
Custom Sink | 自定义Sink实现 |
Data Flow:
flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
如何配置
做过的案例: 收集A,B两台机器上的ngnix日志access.log,汇总到C机器上。 假设各个机器IP地址为:A->1.1;B->1.2;C->1.3
- 配置A和B机器,相同
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/logs/ngnix/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.1.3
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 配置C机器
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
# source r1定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.156:9000/source/%{type}/%Y%m%d
#指定文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
#不压缩
a1.sinks.k1.hdfs.fileType = DataStream
#如果压缩指定压缩的方式
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = gzip
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
#如果压缩存储的话HDFS上的文件达到64M时生成一个文件注意是压缩前大小为64生成一个文件,然后压缩存储。
a1.sinks.k1.hdfs.rollSize = 67108864
a1.sinks.k1.hdfs.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动flume
flume-ng agent -n a1 -c /opt/conf -f /opt/conf/netcat.conf -Dflume.root.logger=DEBUG,console
<!---c 指定flume中配置文件的目录 -->
<!---f 指定配置文件 -->
<!---Dflume.root.logger=DEBUG,console 设置日志等级-->