flume 日志收集

Posted by LQ on March 24, 2017

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

flume

flume其实就是一个分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去。也可以对数据进行简单的处理。

Evnet

event是将从数据源(source)收集过来,再缓存(channel)传输到目的地(sink)的数据进行封装,是flume传输数据的基本单位。传输流程为event从source,流向channel,再到sink,一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录)

Client

Client 是一个将原始log包装成events并且发送它们到一个或者多个agent的实体。Client 在Flume的拓扑结构中不是必须的,它的目的是从数据源系统中解耦Flume。

Agent:

一个独立的Flume进程,包含组件Source、Channel、Sink。它利用这些组件将events从一个节点传输到另一个节点或最终目的地。agent是Flume流的基础部分。

  • Source:用来消费传递到该组件的Event,负责接收event或通过特殊机制产生event,并将events批量的放到一个或多个Channel。 它包含event驱动和轮询两种类型。且必须至少和一个Channel关联。类型有Syslog、NetCat、Exec、Avro、Thrift。
Source类型 说明
Avro Source 支持Avro协议(实际上是Avro RPC),内置支持
Thrift Source 支持Thrift协议,内置支持
Exec Source 基于Unix的command在标准输出上生产数据
JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source 监控指定目录内数据变更
Twitter 1% firehose Source 通过API持续下载Twitter数据,试验性质
Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source 序列生成器数据源,生产序列数据
Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
  • Channel:Channel位于Source和Sink之间,用于缓存进来的event。当Sink成功的将event发送到下一跳的Channel或最终目的地,event才Channel中移除。不同的Channel提供的持久化水平也是不一样的,类型主要有Memory Channel、JDBC Channel、File Channel,可以和任何数量的Source和Sink工作。
Channel类型 说明
Memory Channel Event数据存储在内存中
JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel Event数据存储在磁盘文件中
Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel 测试用途
Custom Channel 自定义Channel实现
  • Sink :Sink负责将event传输到下一跳或最终目的,成功完成后将event从Channel移除。 它也有不同类型的Sink:存储event到最终目的的终端Sink。比如HDFS,HBase;自动消耗的Sink。比如:Null Sink;用于Agent间通信的IPC sink:Avro。Sink必须作用于一个确切的Channel。
Sink类型 说明
HDFS Sink 数据写入HDFS
Logger Sink 数据写入日志文件
Avro Sink 数据被转换成Avro Event,然后发送到配置的RPC端口上
Thrift Sink 数据被转换成Thrift Event,然后发送到配置的RPC端口上
IRC Sink 数据在IRC上进行回放
File Roll Sink 存储数据到本地文件系统
Null Sink 丢弃到所有数据
HBase Sink 数据写入HBase数据库
Morphline Solr Sink 数据发送到Solr搜索服务器(集群)
ElasticSearch Sink 数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink 写数据到Kite Dataset,试验性质的
Custom Sink 自定义Sink实现

Data Flow:

flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

如何配置

做过的案例: 收集A,B两台机器上的ngnix日志access.log,汇总到C机器上。 假设各个机器IP地址为:A->1.1;B->1.2;C->1.3

  1. 配置A和B机器,相同
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/logs/ngnix/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.1.3
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 配置C机器
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# source r1定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#具体定义sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.156:9000/source/%{type}/%Y%m%d
#指定文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
#不压缩
a1.sinks.k1.hdfs.fileType = DataStream
#如果压缩指定压缩的方式
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = gzip
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
#如果压缩存储的话HDFS上的文件达到64M时生成一个文件注意是压缩前大小为64生成一个文件,然后压缩存储。
a1.sinks.k1.hdfs.rollSize = 67108864
a1.sinks.k1.hdfs.rollInterval = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动flume
flume-ng  agent -n a1  -c /opt/conf  -f /opt/conf/netcat.conf   -Dflume.root.logger=DEBUG,console
<!---c 指定flume中配置文件的目录 -->
<!---f 指定配置文件 -->
<!---Dflume.root.logger=DEBUG,console 设置日志等级-->