关于Fluentd使用中filter及ES插件的问题记录

背景介绍:线上需求将业务log数据清洗后导入kibana,原始log也要一同导入,从服务直接输出两份log肯定是不合理的,所以考虑从fluentd收集的时候进行处理,先将原始数据复制为2份,然后一份直接导入kibana,另一份通过fluentd的filter进行过滤筛选后导入kibana。其中filter遇到无法筛选nested类型数据的问题,升级版本后并更改写法后解决;升级后导致ES的index无法按原来的方法命名的问题,通过更换ES插件解决,具体见以下记录。

测试环境:
OS: Amazon Linux AMI release 2015.03
Fluentd: td-agent 0.12.20

问题1:filter过滤条件不支持nested

问题描述

  • 原始数据如下
    需要两个过滤条件type_name以及info.Type(这是一个nested键值)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    {
    "Level": "Error",
    "logtime": 1519444062811430400,
    "@timestamp": "2018-02-24T03:47:42.811Z",
    "utc8": "2018-02-24 11:47:42",
    "type_name": "CostCurrency",
    "gid": 202,
    "sid": 3258,
    "avatar": "4",
    "corplvl": 44,
    "channel": "130134001232",
    "info": {
    "Reason": "AbstractCancelCost",
    "Type": "VI_HC",
    "Value": 20,
    "BefValue": 3139,
    "AftValue": 3119,
    "VIP": 5
    }
    }
  • 首先查找文档找到了filter_grep插件
    经过测试,regexp1是可以的,但regexp2不行,提了Issues答复是因为这个插件不支持nested,目前已经被弃用了。测试配置如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <match logics.**>
    @type copy
    <store>
    @type elasticsearch
    ...
    </store>
    <store>
    @type relabel
    @label @CostCurrency
    </store>
    </match>
    <label @CostCurrency>
    <filter logics.**>
    @type grep
    regexp1 type_name CostCurrency
    regexp2 info.Type VI_HC
    </filter>
    <match logics.**>
    @type elasticsearch
    ...
    </match>
    </label>
    #td-agent --dry-run -c /etc/td-agent/td-agent.conf #测试配置文件命令

解决方法

  • 升级fluentd版本
    安装版本>=0.14.19 从这个版本作者支持了nested
    升级后修改配置文件如下,就可以支持nested过滤了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <filter logics.**>
    @type grep
    #regexp1 type_name CostCurrency #这个在v0.12版本是可用的
    <regexp>
    key type_name
    pattern CostCurrency
    </regexp>
    <regexp>
    key $.info.Type
    pattern ^VI_HC$
    </regexp>
    </filter>

问题2:升级v0.14之后无法使用${tag_parts[-1]}这种方式命名index

问题描述

  • 目前环境中client TD发来的数据tag类似part1.xxx.part2,我希望拥有相同part1和part2的使用同一个index,所以有了这个需求
    升级版本之后,之前使用的logstash_dateformat logics-${tag_parts[-1]}.%Y.%m.%d这种方式不生效了,经验证,只能使用${tag}这一种方式,但并达不到我需要的效果,经过查证官方文档,使用elasticsearch_dynamic插件替代原来的elasticsearch插件,就可以正常使用了。不过官方警告如下,所以目前考虑是不是需要更改log收集的思路,目前先按这个执行测试,后续有问题再改进

    Please note, this uses Ruby’s eval for every message, so there are performance and security implications.

  • 升级后配置如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <store>
    # @type elasticsearch
    @type elasticsearch_dynamic
    host data1.elasticsearch.qa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
    logstash_dateformat %Y.%m.%d
    time_key time
    </store>

完整配置

  • 完整中转配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    <source>
    @type forward
    #port 24224
    </source>
    <match debug.**>
    @type stdout
    </match>
    <source>
    @type debug_agent
    bind 127.0.0.1
    port 24230
    </source>
    ##################################################################################
    <match logics.**>
    @type copy
    #@type forest
    #subtype copy
    <store>
    # @type elasticsearch
    @type elasticsearch_dynamic
    host data1.elasticsearch.taiqa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
    logstash_dateformat %Y.%m.%d
    time_key time
    <buffer>
    @type file
    path /var/log/td-agent/buffer/td-gamex-buffer
    chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
    total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
    chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
    compress text #The option to specify compression of each chunks, during events are buffered
    flush_mode default
    flush_interval 15s #Default: 60s
    flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
    delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
    overflow_action throw_exception
    retry_timeout 10m
    </buffer>
    </store>
    <store>
    @type relabel
    @label @CostCurrency
    </store>
    </match>
    <label @CostCurrency>
    <filter logics.**>
    @type grep
    #regexp1 type_name CostCurrency
    <regexp>
    key type_name
    pattern CostCurrency
    </regexp>
    <regexp>
    key $.info.Type
    pattern ^VI_HC$
    </regexp>
    </filter>
    <match logics.**>
    @type elasticsearch
    host data1.elasticsearch.taiqa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix cost
    logstash_dateformat currency-hc.%Y.%m.%d
    time_key time
    <buffer>
    @type file
    path /var/log/td-agent/buffer/td-cost-buffer
    chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
    total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
    chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
    compress text #The option to specify compression of each chunks, during events are buffered
    flush_mode default
    flush_interval 15s #Default: 60s
    flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
    delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
    overflow_action throw_exception
    retry_timeout 10m
    </buffer>
    </match>
    </label>

参考文档

关于tag或字段rewrite
ES插件文档