算子清洗功能介绍-j9九游会登录
功能说明
filebeat是一个轻量级的采集器,用于采集和处理,转发日志数据。filebeat将作为代理安装在您的服务器上,监控您指定的日志文件或位置,收集日志事件,并将它们转发到kafka,推送到下一个处理点。
processors是filebeat定义了一系列对单条日志操作的方法,目前支持三类操作:
- 减少导出字段的数量
- 使用附加元数据增加字段
- 执行额外的处理或解码
每个processor接受一个事件(单条日志),将用户定义的操作应用于该事件并返回。如果您定义了一个处理器列表,它们将按照在filebeat配置文件中定义的顺序执行。任何一个算子执行失败,都会直接终止执行链条,并将异常事件发送到fail_to_topic,异常信息会记录在事件中的@errmsg中。
算子配置基本结构
算子配置基本配置结构如下,具体可参见。
processors: -: when: - : when:
其中
更复杂的条件处理可以通过使用if-then-else条件表达式配置来完成,这允许基于单个条件执行多个处理器。
processors:
- if:
then:
- :
- :
...
else:
- :
- :
...
if-then-else条件表达式中,then是必须的,else是可选的。
在日志采集配置场景中,定义的processor只能针对某个input进行配置,没有全局配置。
算子介绍
filebeat自带的processor算子,如下:
、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、。
其他算子介绍如表1所示。
|
算子 |
算子能力 |
算子样例 |
|---|---|---|
|
equals |
比较字段是否具有特定值,只接受整数或字符串值,常和concat、when、and等算子组合使用。 |
例如:如果message中的type="cli",then接后续逻辑。 - if:
equals:
type: "cli"
then: |
|
contains |
检查一个值是否是个字段的一部分,该字段可以是字符串或字符串数组。条件只接受一个字符串值。常和if、equals、and、concat算子组合使用。 |
例如:检查日志字段中的@logtype是否含有modelservicegateway_interface_log,then接后续逻辑。 - if:
contains:
"@logtype": "modelservicegateway_interface_log"
then: |
|
regexp |
校验字符串是否满足正则表达式,只接受字符串。 |
例如:以下条件检查进程名称是否以foo开头: regexp: system.process.name: "^foo.*" |
|
range |
检查该字段是在一定范围内的值,支持小于lt、小于等于lte、大于gt和大于等于gte,条件只接受整数或浮点值。 |
例如:以下条件通过将http.response.code字段与400进行比较来检查失败的http事务。 range: http.response.code: gte: 400 这也可以写成: range: http.response.code.gte: 400 以下条件检查cpu使用率的百分比值是否介于0.5和0.8之间。 range: system.cpu.user.pct.gte: 0.5 system.cpu.user.pct.lt: 0.8 |
|
network |
检查该字段是否在某个ip网络范围内,支持ipv4和ipv6地址。可以使用cidr表示法指定网络范围,例如“192.0.2.0/24”或“2001:db8::/32”,或使用以下命名范围之一:
|
如果source.ip值在私有地址空间内,则以下条件返回true。 network: source.ip: private 如果destination.ip值的ipv4范围内192.168.1.0 - 192.168.1.255,则以下条件返回true。 network: destination.ip: '192.168.1.0/24' 当destination.ip在任何给定的子网中时,则以下条件返回true 。 network: destination.ip: ['192.168.1.0/24', '10.0.0.0/8', loopback] |
|
has_fields |
如果事件存在的所有给定的领域,该条件接受表示字段名称的字符串值列表。 |
例如:以下条件检查http.response.code字段是否存在于事件中。 has_fields: ['http.response.code'] |
|
or |
逻辑条件表达式,只需满足其中一个条件 |
例如:配置条件appid等于null或者为空,则把appid3的值赋值给appid。 - if:
or:
- equals:
appid: "null"
- equals:
appid: ""
then:
- assign:
source: ["appid3"]
target_field: "appid" |
|
and |
逻辑条件表达式,同时满足两个或多个条件,常见组合算子有if、or、contains、 equals等算子。 |
例如:判断某个字段值为""且message中的@logtype字段含有modelservicegateway_interface_log,就将字段modelcost1赋值给modelcost字段。 - if:
and:
- equals:
modelcost: ""
- contains:
"@logtype": "modelservicegateway_interface_log"
then:
- assign:
source: ["modelcost1"]
target_field: "modelcost" |
|
not |
接收否定逻辑表达。 |
例如:配置条件@logtype不为modelservicegateway_interface_log时,把@logtype字段的值赋值给clustername字段。 - if:
not:
equals:
" @logtype": "modelservicegateway_interface_log"
then:
- assign:
source: ["@logtype"]
target_field: "clustername" |
|
replace |
针对需要对维度降维或者其他替换场景。 |
例如:将字段clustername中的_modelservicegateway_interface_log替换成""。 - replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "clustername",pattern: '_modelservicegateway_interface_log',replacement: ""}
|
|
dissect |
根据指定的分隔符切分字符串。 |
使用dissect算子更好地组织和解析数据,便于后续的分析或处理,使用示例如下: - dissect:
tokenizer: "%{key1} %{key2} %{key3|convert_datatype}"
field: "message"
target_prefix: ""
如果message字段的内容是“key1=value1 key2=value2 key3=2022-01-01”,经过dissect操作后将会生成新的字段,key1的值为 “value1”,key2为 “value2”,key3或key3_date(进行了转换)为日期格式的 “2022-01-01”。 |
|
timestamp |
增加配置目标时间字段格式。 |
时间戳算子仅支持以时间对象设置目标字段,为了适配业务对时间戳的诉求,特殊增加了“target_layout”字段,用来配置目标格式,增加“target_timezone”字段,来配置目标时区。 - timestamp:
field: timestring
target_field: timestamp
target_layout: "unix_ms"
timezone: "xxx/xxxx"
layouts:
- '2006-01-02 15:04:05.999'
test:
- '2021-02-11 17:27:25.025'
target_layout支持的格式如下:
|
|
assign |
根据条件赋值。 |
- assign:
source: ["spvolumeid","vod_id","campalias"]
target_field: "content_id_a"
|
|
concat |
字段之间或者字段和字符串之间拼接 |
- concat:
when:
and:
- not:
equals:
source: ""
- not:
equals:
target: ""
fields:
- { field: "%{source}"}
- { field: "->"}
- { field: "%{target}"}
target_field: interface_type
|
|
decode_json |
对某一个json字段进行解析。 |
例如:对retmsg进行解析。 - decode_json:
fields:
- field: "retmsg"
filters:
- {target_field: "user_id",type: "string",filter_keys: ["useid"]}
|
|
format_tuple |
针对有简单规律的字符串结构解析,比如url参数解析。 |
- format_tuple:
field: biztag
collection_items_terminated_by: "&"
map_key_terminated_by: "="
filters:
- { target_field: "useragent", filter_keys: ["user-agent","useragent","user_agent"], default: "-" }
- { target_field: "servicetype", filter_keys: ["servicetype","service_type"], default: "-" }
如果输入biztag的值是:"user-agent= office&servicetype=wiseoap&otherarrtributes=other"解析后日志为:{"useragent":" office","servicetype":"wiseoap"}
|
|
mapping |
将事件的值做一个映射。 |
- mapping:
dict:
contentserver: "videocontentserverservice"
contentmanage: "videocontentmanageservice"
onsaleserver: "videocontentonsaleservice"
playserver: "videoplayserverservice"
kmsserver: "videokmsservice"
drmmanage: "videodrmmanageservice"
drmproxy: "videoopendrmproxyservice"
opendrmmanage: "videoopendrmmanageservice"
cads: "videoopendrmcadsservice"
quickserver: "videoquickserverservice"
shortmanage: "videoshortmanageservice"
shortserver: "videoshortserverservice"
shortonsale: "videoshortonsaleservice"
sysmanage: "videosysmanageservice"
sysserver: "videosysserverservice"
behavaiorservice: "videouserbehaviorservice"
campaignservice: "videousercampaignservice"
poservice: "videoorderservice"
productservice: "videoproductservice"
rightservice: "videouserrightservice"
userservice: "videouserauthservice"
rules:
- { source_key: "source" }
- { source_key: "target_pre" }
|
|
segment |
分段算子,对一个字段通过一批判断逻辑后,简单的赋值操作,比如针对时延分段。 |
- segment:
field: response_time
target_field: delay_section
default: "-1"
rules:
- { label: "0-50",operation: [{operation_type: "gte", value: 0},{operation_type: "lt",value: 50}]}
- { label: "50-100",operation: [{operation_type: "gte",value: 50},{operation_type: "lt",value: 100}]}
- { label: "100-200",operation: [{operation_type: "gte",value: 100},{operation_type: "lt",value: 200}]}
- { label: "200-500",operation: [{operation_type: "gte",value: 200},{operation_type: "lt",value: 500}]}
- { label: "500-1000",operation: [{operation_type: "gte",value: 500},{operation_type: "lt",value: 1000}]}
- { label: "1000-2000",operation: [{operation_type: "gte",value: 1000},{operation_type: "lt",value: 2000}]}
- { label: "2000-5000",operation: [{operation_type: "gte",value: 2000},{operation_type: "lt",value: 5000}]}
- { label: "5000-10000",operation: [{operation_type: "gte",value: 5000},{operation_type: "lt",value: 10000}]}
- { label: ">1000",operation: [{operation_type: "gte",value: 10000}]}
|
|
substring |
用来裁剪字符串使用,比如针对拼接的traceid,裁剪获得各部分的值。 |
- substring:
fail_on_error: false
ignore_missing: true
fields:
- {from: "transaction_id", begin_index: 0,end_index: 38, to: "x_traceid" }
- {from: "transaction_id", begin_index: 0,end_index: 32, to: "x_traceid_begin" }
- {from: "transaction_id", begin_index: -20, to: "x_traceid_spanid" }
|
|
timeout |
该算子主要是为了预防用户配置错误导致采集到过早历史的日志。为了系统的稳定运行,不要采集过早时间的日志做分析,此算子必须配置 。 |
例如,检查当前日志的timestamp字段,这个字段的值是毫秒形式时间戳。如果当前时间减去timestamp对应的时间小于等于120分钟,那么日志正常发送,如果大于120分钟,表示出现异常,日志发送到fail_to_topic。 - timeout
field: "timestamp"
layout: "unix_ms"
rules:
- {operation_type: "lte", value: 120}
|
算子组合使用样例
日志样例
2022-11-15 19:51:43.735 [20221115t115143]|ai||system|system|inner_ip|add|logrecord|aioperationlog(id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceid=null, identityid=1111111111111, userid=test, timestamp=2022-11-15 19:51:43.67, operationtype=welink_trigger)||y - serviceid:null
算子功能
对于非json格式的如何转化成json直接解析拿到key、value。例如id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceid=null, identityid=1111111111111, userid=test, timestamp=2022-11-15 19:51:43.67, operationtype=welink_trigger。
算子脚本
- dissect:
tokenizer: '%{time}|%{thread}|%{logversion}|%{uid}|%{sign}|%{traceid}|%{cc}|%{operationlog}|%{bb}|%{aa}'
field: "message"
target_prefix: ""
trim_values: "none"
ignore_failure: true
- replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "bb",pattern: 'aioperationlog\(',replacement: "{\""}
- {field: "bb",pattern: '\)',replacement: "\"}"}
- {field: "bb",pattern: ' ,',replacement:"\""}
- if:
contains:
"operationlog": "logrecord"
then:
- replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "bb",pattern: '=',replacement: "\":\""}
- {field: "bb",pattern: ' ',replacement:""}
- {field: "bb",pattern: '\,',replacement: "\",\""}
- decode_json:
ignore_missing: true
ignore_failure: true
fields:
- field: "bb"
filters:
- {target_field: "id",type: "string",filter_keys: ["id"]}
- {target_field: "serviceid",type: "string",filter_keys: ["serviceid"]}
- {target_field: "identityid",type: "string",filter_keys: ["identityid"]}
- {target_field: "userid",type: "string",filter_keys: ["userid"]}
- {target_field: "timestamp",type: "string",filter_keys: ["timestamp"]}
- {target_field: "operationtype",type: "string",filter_keys: ["operationtype"]}
- drop_fields:
fields: ["time","thread","logversion","uid","sign","traceid","cc","aa","bb"]
ignore_missing: true
- drop_event:
when:
not:
equals:
"operationlog": "logrecord"
相关文档
意见反馈
文档内容是否对您有帮助?
如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨