设置并启动Logstash

Logstash目录布局

本节描述在解包Logstash安装包时创建的默认目录结构。

.zip和.tar.gz包的目录布局

.zip和.tar.gz包是完全独立的。默认情况下,所有文件和目录都包含在主目录中——解压归档文件时创建的目录。

这非常方便,因为您不需要创建任何目录就可以开始使用Logstash,而卸载Logstash就像删除主目录一样简单。但是,建议更改config和logs目录的默认位置,这样以后就不会删除重要的数据。

TypeDescriptionDefault LocationSetting
homeLogstash安装的主目录。{extract.path}- 解压缩归档文件创建的目录
bin二进制脚本,包括启动logstash的logstash和安装插件的logstash-plugin{extract.path}/bin
settings配置文件,包括logstash.ymljvm.options{extract.path}/configpath.settings
logsLog files{extract.path}/logspath.logs
plugins本地,非Ruby-Gem插件文件。每个插件都包含在一个子目录中。建议仅用于开发。{extract.path}/pluginspath.plugins
datalogstash及其插件用于任何持久性需求的数据文件。{extract.path}/datapath.data

Debian和RPM包目录布局

TypeDescriptionDefault LocationSetting
homeLogstash安装的主目录/usr/share/logstash
bin二进制脚本,包括启动logstash的logstash和安装插件的logstash-plugin/usr/share/logstash/bin
settings配置文件,包括logstash.ymljvm.optionsstartup.options/etc/logstashpath.settings
confLogstashpipeline配置文件/etc/logstash/conf.d/*.confSee /etc/logstash/pipelines.yml
logsLog files/var/log/logstashpath.logs
plugins本地,非Ruby-Gem插件文件。每个插件都包含在一个子目录中。建议仅用于开发。/usr/share/logstash/pluginspath.plugins
datalogstash及其插件用于任何持久性需求的数据文件。/var/lib/logstashpath.data

Docker镜像的目录布局

Docker映像是从.tar.gz包中创建的,并遵循类似的目录布局。

TypeDescriptionDefault LocationSetting
homeLogstash安装的主目录/usr/share/logstash
bin二进制脚本,包括启动logstash的logstash和安装插件的logstash-plugin/usr/share/logstash/bin
settings配置文件,包括logstash.ymljvm.options/usr/share/logstash/configpath.settings
confLogstash pipeline配置文件/usr/share/logstash/pipelinepath.config
plugins本地,非Ruby-Gem插件文件。每个插件都包含在一个子目录中。建议仅用于开发。/usr/share/logstash/pluginspath.plugins
datalogstash及其插件用于任何持久性需求的数据文件。/usr/share/logstash/datapath.data

Logstash配置文件

Logstash有两种类型的配置文件:pipeline配置文件(定义Logstash处理管道)和setting文件(指定控制Logstash启动和执行的选项)。

Pipeline配置文件

在定义Logstash处理管道的各个阶段时,您将创建管道配置文件。在deb和rpm中,将管道配置文件放在/etc/logstash/conf的目录下。Logstash尝试只加载/etc/logstash/conf中扩展名为.conf的文件并忽略所有其他文件。

Settings配置文件

在Logstash安装中已经定义了设置文件。Logstash包含以下设置文件:

logstash.yaml

包含Logstash配置指令。您可以在这个文件中设置指令,而不是在命令行中传递指令。您在命令行中设置的任何指令都会覆盖logstash.yaml中相应的设置。查到logstash.yml以获取更多信息。

pipeline.yaml

包含用于在单个Logstash实例中运行多个管道的框架和指令。有关更多信息,请参阅多个管道

jvm.options

包含JVM配置指令。使用此文件设置总堆空间的初始值和最大值。还可以使用此文件设置Logstash的区域设置。在单独的行中指定每个指令。该文件中的所有其他设置都被认为是专家设置。

log4j2.properties

包含log4j2库的默认设置。有关更多信息,请参阅Log4j2配置

startup.options(Linux)

包含系统安装脚本在/usr/share/logstash/bin中使用的选项,用于为您的系统构建适当的启动脚本。在安装Logstash包时,在安装过程的最后执行system-install脚本,并使用在启动时指定的设置。选项,用于设置用户、组、服务名称和服务描述等选项。默认情况下,Logstash服务安装在用户Logstash下。启动。options文件使您更容易安装Logstash服务的多个实例。您可以复制文件并更改特定设置的值。注意这个启动。启动时没有读取选项文件。如果您希望更改Logstash启动脚本(例如,更改Logstash用户或从不同的配置路径读取),则必须重新运行system-install脚本(以root身份)以传入新的设置。

Logstash.yml

您可以在Logstash设置文件logstash.yaml中设置选项,以控制Logstash的执行。例如,您可以指定管道设置、配置文件的位置、日志记录选项和其他设置。当您运行logstash时,logstash文件中的大多数设置也可以作为命令行指令使用。您在命令行中设置的任何指令都会覆盖logstash.yaml文件中相应的设置。

logstash.yaml文件是用YAML编写的。它的位置因平台而异(请参阅Logstash目录布局)。您可以以层次形式指定设置或使用 flat keys。例如,要使用分层形式设置管道批大小和批延迟,可以指定:

pipeline:
  batch:
    size: 125
    delay: 50

要表示与普通键相同的值,可以指定:

pipeline.batch.size: 125
pipeline.batch.delay: 50

logstash.yaml文件还支持环境变量和keystore secrets的bash式填写设置值。

pipeline:
  batch:
    size: ${BATCH_SIZE}
    delay: ${BATCH_DELAY:50}
node:
  name: "node_${LS_NODE_NAME}"
path:
   queue: "/tmp/${QUEUE_DIR:queue}"

注意,支持 $ 符号,在上面的例子中,设置默认批处理延迟为 50,默认 path.queue 为 /tmp/queue。

模块也可以在logstash.yml文件中指定。模块的定义将有这样的格式:

modules:
  - name: MODULE_NAME1
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE
    var.PLUGIN_TYPE2.PLUGIN_NAME2.KEY1: VALUE
    var.PLUGIN_TYPE3.PLUGIN_NAME3.KEY1: VALUE
  - name: MODULE_NAME2
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE

如果使用命令行指令 --modules,任何在logstash.yml文件中定义的模块将被忽略。

logstash.yml文件包括以下设置:

SettingDescriptionDefault value
node.nameA descriptive name for the node.Machine’s hostname
path.dataThe directory that Logstash and its plugins use for any persistent needs.LOGSTASH_HOME/data
pipeline.idThe ID of the pipeline.main
pipeline.java_executionUse the Java execution engine.true
pipeline.workersThe number of workers that will, in parallel, execute the filter and output stages of the pipeline. This setting uses the java.lang.Runtime.getRuntime.availableProcessors value as a default if not overridden by pipeline.workers in pipelines.yml or pipeline.workers from logstash.yml. If you have modified this setting and see that events are backing up, or that the CPU is not saturated, consider increasing this number to better utilize machine processing power.Number of the host’s CPU cores
pipeline.batch.sizeThe maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs. Larger batch sizes are generally more efficient, but come at the cost of increased memory overhead. You may need to increase JVM heap space in the jvm.options config file. See Logstash Configuration Files for more info.125
pipeline.batch.delayWhen creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers.50
pipeline.unsafe_shutdownWhen set to true, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown.false
pipeline.plugin_classloaders(Beta) Load Java plugins in independent classloaders to isolate their dependencies.false
pipeline.orderedSet the pipeline event ordering.Valid options are:auto``true``false``auto will automatically enable ordering if the pipeline.workers setting is also set to 1. true will enforce ordering on the pipeline and prevent logstash from starting if there are multiple workers. false will disable the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost of preserving order.auto
pipeline.ecs_compatibilitySets the pipeline’s default value for ecs_compatibility, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema. Possible values are:disabled``v1``v8This option allows the early opt-in (or preemptive opt-out) of ECS compatibility modes in plugins, which is scheduled to be on-by-default in a future major release of Logstash.Values other than disabled are currently considered BETA, and may produce unintended consequences when upgrading Logstash.disabled
path.config主管道的Logstash配置的路径。如果你指定一个目录或通配符,配置文件将按字母顺序从该目录中读取。特定平台. 见 Logstash目录布局
config.string一个字符串,包含要用于主管道的管道配置。使用与配置文件相同的语法。None
config.test_and_exitWhen set to true, checks that the configuration is valid and then exits. Note that grok patterns are not checked for correctness with this setting. Logstash can read multiple config files from a directory. If you combine this setting with log.level: debug, Logstash will log the combined config file, annotating each config block with the source file it came from.false
config.reload.automaticWhen set to true, periodically checks if the configuration has changed and reloads the configuration whenever it is changed. This can also be triggered manually through the SIGHUP signal.false
config.reload.intervalHow often in seconds Logstash checks the config files for changes. Note that the unit qualifier (s) is required.3s
config.debugWhen set to true, shows the fully compiled configuration as a debug log message. You must also set log.level: debug. WARNING: The log message will include any password options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs!false
config.support_escapesWhen set to true, quoted strings will process the following escape sequences: \n becomes a literal newline (ASCII 10). \r becomes a literal carriage return (ASCII 13). \t becomes a literal tab (ASCII 9). \\ becomes a literal backslash \. \" becomes a literal double quotation mark. \' becomes a literal quotation mark.false
modulesWhen configured, modules must be in the nested YAML structure described above this table.None
queue.typeThe internal queuing model to use for event buffering. Specify memory for legacy in-memory based queuing, or persisted for disk-based ACKed queueing (persistent queues).memory
path.queueThe directory path where the data files will be stored when persistent queues are enabled (queue.type: persisted).path.data/queue
queue.page_capacityThe size of the page data files used when persistent queues are enabled (queue.type: persisted). The queue data consists of append-only data files separated into pages.64mb
queue.max_eventsThe maximum number of unread events in the queue when persistent queues are enabled (queue.type: persisted).0 (unlimited)
queue.max_bytesThe total capacity of the queue in number of bytes. Make sure the capacity of your disk drive is greater than the value you specify here. If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first.1024mb (1g)
queue.checkpoint.acksThe maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.acks: 0 to set this value to unlimited.1024
queue.checkpoint.writesThe maximum number of written events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.writes: 0 to set this value to unlimited.1024
queue.checkpoint.retryWhen enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances.false
queue.drainWhen enabled, Logstash waits until the persistent queue is drained before shutting down.false
dead_letter_queue.enableFlag to instruct Logstash to enable the DLQ feature supported by plugins.false
dead_letter_queue.max_bytesThe maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting.1024mb
path.dead_letter_queueThe directory path where the data files will be stored for the dead-letter queue.path.data/dead_letter_queue
http.hostThe bind address for the metrics REST endpoint."127.0.0.1"
http.portThe bind port for the metrics REST endpoint.9600
log.levelThe log level. Valid options are:fatal``error``warn``info``debug``traceinfo
log.formatThe log format. Set to json to log in JSON format, or plain to use Object#.inspect.plain
path.logsThe directory where Logstash will write its log to.LOGSTASH_HOME/logs
pipeline.separate_logsThis a boolean setting to enable separation of logs per pipeline in different log files. If enabled Logstash will create a different log file for each pipeline, using the pipeline.id as name of the file. The destination directory is taken from the path.logs setting. When there are many pipelines configured in Logstash, separating each log lines per pipeline could be helpful in case you need to troubleshoot what’s happening in a single pipeline, without interference of the other ones.false
path.pluginsWhere to find custom plugins. You can specify this setting multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: PATH/logstash/TYPE/NAME.rb where TYPE is inputs, filters, outputs, or codecs, and NAME is the name of the plugin.Platform-specific. See Logstash Directory Layout.

用于安全设置的秘密密钥库

当你配置Logstash时,你可能需要指定敏感的设置或配置,如密码。与其依靠文件系统权限来保护这些值,你可以使用Logstash keystore来安全地存储用于配置设置的秘密值。

在向keystore添加密钥及其秘密值后,你可以在配置敏感设置时使用密钥来代替秘密值。

引用密钥的语法与环境变量的语法相同:

${KEY}

其中KEY是钥匙的名称。

例子

假设keystore包含一个名为ES_PWD的密钥,其值为yourelasticsearchpassword

在配置文件中使用:

output { elasticsearch {...password => "${ES_PWD}" } } }

logstash.yaml中使用

xpack.management.elasticsearch.password: ${ES_PWD}

请注意,Logstash keystore与Elasticsearch keystore不同。Elasticsearch keystore让你按名称存储elasticsearch.yml值,而Logstash keystore让你指定任意的名称,你可以在Logstash配置中引用。

关于如何生成keystore,见:https://www.elastic.co/guide/en/logstash/current/keystore.html

从命令行运行Logstash

要从命令行运行Logstash,使用以下命令:

bin/logstash [options]

其中options命令行指令,你可以指定它来控制Logstash的执行。bin目录的位置因平台而异。参见Logstash目录布局以找到你系统中bin\logstash的位置。

下面的例子运行Logstash并加载mypipeline.conf文件中定义的Logstash配置:

bin/logstash -f mypipeline.conf

你在命令行中设置的任何指令都会覆盖logstash.yml中的相应设置,但文件本身不会被改变。它在随后的 Logstash 运行中保持原样。

当你在测试Logstash时,指定命令行选项是很有用的。然而,在生产环境中,我们建议你使用 logstash.yml 来控制 Logstash 的执行。使用设置文件可以让你更容易地指定多个选项,而且它为你提供了一个单一的、版本化的文件,你可以用它来启动Logstash保证每次运行是一致的。

Command-Line Flags

Logstash有以下命令。你可以使用--help指令来显示这些信息。

  • --node.name NAME

指定这个Logstash实例的名称。如果没有给出值,它将默认为当前的主机名。

  • -f, --path.config CONFIG_PATH

从一个特定的文件或目录加载Logstash配置。如果给出了一个目录,该目录中的所有文件将按词汇表顺序串联,然后作为一个单一的配置文件进行解析。不支持多次指定这个指令。如果你多次指定这个指令,Logstash使用最后一次出现的标志(例如,-f foo -f bar-f bar相同)。

你可以指定通配符(globs),任何匹配的文件将按照上述顺序加载。例如,你可以使用通配符功能,按名称加载特定的文件。

bin/logstash --debug -f '/tmp/{one,two,three}'

通过这个命令,Logstash将三个配置文件/tmp/one/tmp/two/tmp/three串联起来,并将它们解析为一个单独的配置。

  • -e, --config.string CONFIG_STRING

    使用给定的字符串作为配置数据。与配置文件的语法相同。如果没有指定输入,那么将使用以下内容作为默认输入:input { stdin { type => stdin }。如果没有指定输出,则使用以下内容作为默认输出:output { stdout { codec => rubydebug },如果你希望同时使用这两个默认值,请为-e标志使用空字符串。默认是nil。

  • --java-execution

    为这个选项指定 "false "可以恢复到传统的Ruby执行引擎,而不是默认的Java执行引擎。

  • --plugin-classloaders

    (Beta)在独立的类加载器中加载Java插件,以隔离其依赖关系。

  • --modules

    启动指定的模块。与-M选项一起使用,为指定模块的默认变量赋值。如果在命令行中使用--moduleslogstash.yml中的任何模块都将被忽略,那里的任何设置也是如此。这个标志与-f-e标志是相互排斥的。只能指定-f-e-modules中的一个。多个模块可以用逗号隔开,或者多次调用--modules标志来指定。

  • -M, --modules.variable

    为一个模块的可配置选项分配一个值。分配变量的格式是 -M "MODULE_NAME.var.PLUGIN_TYPE.PLUGIN_NAME.KEY_NAME=value"对于Logstash变量来说。对于其他设置,它将是-M "MODULE_NAME.KEY_NAME.SUB_KEYNAME=value"-M标志可以根据需要多次使用。如果没有指定-M选项,那么将使用该设置的默认值。-M标志只与--modules标志一起使用。如果没有--modules标志,它将被忽略。

  • --pipeline.id ID

    Sets the ID of pipeline. The default is main.

  • -w, --pipeline.workers COUNT

    设置要运行的管道工作者的数量。这个选项设置了并行执行管道的过滤和输出阶段的工作者的数量。如果你发现事件正在备份,或者CPU没有饱和,可以考虑增加这个数字,以更好地利用机器的处理能力。默认是主机的CPU核心数。

  • --pipeline.ordered ORDERED

    保留事件的顺序。可能的值是 "自动"(默认)、"true "和 "false"。这个设置只有在流水线使用单个工作者时才有效。注意,当启用时,它可能会影响过滤器和输出处理的性能。如果pipeline.workers设置为1auto选项将自动启用排序。使用true在管道上启用排序,如果有多个工作者,则防止logstash启动。使用 false 来禁用任何为保护排序所需的额外处理。

  • -b, --pipeline.batch.size SIZE

    管道工作的批次大小。该选项定义了单个工人线程在试图执行其过滤器和输出之前从输入中收集的最大事件数。默认是125个事件。较大的批处理量通常更有效,但代价是增加内存开销。你可能需要在jvm.options配置文件中增加JVM的堆空间。参见Logstash配置文件了解更多信息。

  • -u, --pipeline.batch.delay DELAY_IN_MS

    当创建管道批处理时,在轮询下一个事件时要等待多长时间。这个选项定义了在轮询下一个事件时要等待多长时间(以毫秒为单位),然后再将一个尺寸不足的批次分配给过滤器和输出。默认是50ms。

  • --pipeline.unsafe_shutdown

    强制Logstash在关机时退出,即使内存中仍有机内事件。默认情况下,Logstash会拒绝退出,直到所有收到的事件都被推送到输出端。启用该选项会导致关机期间的数据丢失。

  • --path.data PATH

    这应该指向一个可写的目录。当Logstash需要存储数据时,它将使用这个目录。插件也可以访问这个路径。默认是Logstash主页下的data目录。

  • -p, --path.plugins PATH

    找到自定义插件的路径。这个标志可以多次给出以包括多个路径。插件应该在一个特定的目录层次中:PATH/logstash/TYPE/NAME.rb,其中TYPEinputsfiltersoutputscodecs,而NAME是插件的名称。

  • -l, --path.logs PATH

    写入Logstash内部日志的目录。

  • --log.level LEVEL

    设置Logstash的日志级别。可能的值是:fatal:记录非常严重的错误信息,通常会导致应用程序中止error:记录错误warn:记录警告info:记录详细的信息(这是默认值)debug:记录调试信息(针对开发者)trace:记录调试信息之外的更细粒度的信息

  • --config.debug

    将完全编译的配置显示为调试日志信息(你还必须启用--log.level=debug)。警告:日志信息将包括以明文形式传递给插件配置的任何密码选项,并可能导致明文密码出现在你的日志中!

  • -i, --interactive SHELL

    下降到shell,而不是正常运行。有效的shell是 "irb "和 "pry"。

  • -V, --version

    输出logstash的版本

  • -t, --config.test_and_exit

    检查配置的语法是否有效,然后退出。注意,grok模式不会因为这个标志而被检查是否正确。Logstash可以从一个目录中读取多个配置文件。如果你把这个标志和 --log.level=debug结合起来,Logstash将记录合并的配置文件,并在每个配置块上注释它来自的源文件。

  • -r, --config.reload.automatic

    监控配置的变化,每当配置发生变化时就重新加载。注意:使用SIGHUP来手动重新加载配置。默认为false。

  • --config.reload.interval RELOAD_INTERVAL

    轮询配置位置变化的频率。默认值是 "3s"。注意,单位修饰符(s)是必需的。

  • --http.host HTTP_HOST

    Web API绑定主机。这个选项指定了度量衡REST端点的绑定地址。默认是 "127.0.0.1"。

  • --http.port HTTP_PORT

    Web API http端口。这个选项指定了度量衡REST端点的绑定端口。默认是9600-9700。该设置接受格式为9600-9700的范围。Logstash会选取第一个可用的端口。

  • --log.format FORMAT

    指定Logstash是以JSON形式(每行一个事件)还是以纯文本形式(使用Ruby的Object#inspect)写入自己的日志。默认是 "纯文本"。

  • --path.settings SETTINGS_DIR

    设置包含logstash.yml[设置文件](https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html)以及log4j日志配置的目录。这也可以通过LS_SETTINGS_DIR环境变量设置。默认是Logstash首页下的config目录。

  • --enable-local-plugin-development

    这个标志使开发者能够更新他们的本地Gemfile,而不会遇到由冻结的lockfile引起的问题。当你在本地开发/测试插件时,这个标志会很有帮助。

这个标志只适用于Logstash开发者。最终用户不应该需要它。

  • -h, --help

    Print help

在Docker上运行Logstash

在Docker中配置Logstash

docker run --rm -it -v ~/pipeline/:/usr/share/logstash/pipeline/

配置Logstash

配置文件的结构

Logstash配置文件有一个单独的部分,用于您想要添加到事件处理管道中的每一种类型的插件。例如:

# This is a comment. You should use comments to describe
# parts of your configuration.
input {
  ...
}

filter {
  ...
}

output {
  ...
}

每个部分包含一个或多个插件的配置选项。如果指定多个筛选器,则按它们在配置文件中的出现顺序应用它们。

插件配置

插件的配置包括插件名和该插件的设置块。例如,这个输入部分配置两个文件输入:

input {
  file {
    path => "/var/log/messages"
    type => "syslog"
  }

  file {
    path => "/var/log/apache/access.log"
    type => "apache"
  }
}

在本例中,为每个文件输入配置了两个设置:路径和类型。

您可以配置的设置根据插件类型而异。有关每个插件的信息,请参见输入插件输出插件过滤插件编解码器插件

值类型

插件可以要求设置的值是某种类型,例如 boolean, list, 或者 hash。支持以下值类型。

Array

这个类型现在已经被弃用了,取而代之的是使用标准类型,比如string,插件中定义了:list => true属性,以便更好地进行类型检查。它仍然需要处理不需要类型检查的散列或混合类型列表。

例如:

users => [ {id => 1, name => bob}, {id => 2, name => jane} ]
Lists

不是类型本身的类型,而是属性类型可以有的类型。这使得输入检查多个值成为可能。插件作者可以通过在声明参数时指定:list => true来启用列表检查。

例如:

  path => [ "/var/log/messages", "/var/log/*.log" ]
  uris => [ "http://elastic.co", "http://example.net" ]

这个例子配置path, path是一个字符串,它是一个包含三个字符串每个元素的列表。它还将uri参数配置为uri列表,如果提供的任何uri无效,则失败。

Boolean

布尔值必须为真或假。注意,true和false关键字没有用引号括起来。

例如:

 ssl_enable => true
Bytes

字节字段是表示有效字节单位的字符串字段。在插件选项中声明特定大小是一种方便的方法。同时支持SI (k M G T P E Z Y)和二进制(Ki Mi Gi Ti Pi Ei Zi Yi)单元。二进制单位是base-1024,国际单位制单位是base-1000。该字段不区分大小写,并且在值和单位之间接受空格。如果没有指定单位,则整数字符串表示字节数。

例子:

  my_bytes => "1113"   # 1113 bytes
  my_bytes => "10MiB"  # 10485760 bytes
  my_bytes => "100kib" # 102400 bytes
  my_bytes => "180 mb" # 180000000 bytes
Codec

编解码器是用于表示数据的Logstash编解码器的名称。编解码器可用于输入和输出。

输入编解码器提供了一种方便的方法,在数据进入输入之前对其进行解码。输出编解码器提供了一种方便的方法,可以在数据离开输出之前对其进行编码。使用输入或输出编解码器可以避免在Logstash管道中使用单独的过滤器。

可用编解码器的列表可以在编解码器插件页面找到。

例子:

 codec => "json"
Hash

哈希是按格式“field1”=>“value1”指定的键值对的集合。请注意,多个键值条目由空格而不是逗号分隔。

例子:

match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
# or as a single line. No commas between entries:
match => { "field1" => "value1" "field2" => "value2" }

Number

数字必须是有效的数值(浮点数或整数)。

例子:

  port => 33
Password

密码是一个带有单个值的字符串,不会被记录或打印。

例子:

  my_password => "password"
URI

URI可以是像http://elastic.co/这样的完整URL,也可以是像foobar这样的简单标识符。如果URI包含密码(如http://user:pass@example.net),则不会记录或打印URI的密码部分。

例子:

 my_uri => "http://foo:bar@example.net"
Path

路径是表示有效操作系统路径的字符串。

例子:

 my_path => "/tmp/logstash"
String

字符串必须是单个字符序列。请注意字符串值是用双引号或单引号括起来的。

转义序列

缺省情况下,不启用转义序列。如果您希望在引号字符串中使用转义序列,您需要在logstash.yml中设置config.support_escapes:true。当为true时,带引号的字符串(双引号和单引号)将进行如下转换:

TextResult
\rcarriage return (ASCII 13)
\nnew line (ASCII 10)
\ttab (ASCII 9)
\backslash (ASCII 92)
"double quote (ASCII 34)
'single quote (ASCII 39)

例子:

  name => "Hello world"
  name => 'It\'s a beautiful day'
Field Reference

Field Reference是一个特殊的String值,表示事件中字段的路径,例如@timestamp或[@timestamp]引用顶级字段,或[client][ip]访问嵌套字段。字段引用深度潜水提供了关于字段引用结构的详细信息。当作为配置选项提供字段引用时,需要用引号括起来,特殊字符必须按照与String相同的规则进行转义。

Comments

注释与perl、ruby和python中的注释相同。注释以#字符开始,不需要位于行首。例如:

# this is a comment

input { # comments can appear at the end of a line, too
  # ...
}

访问配置中的事件数据和字段

logstash代理是一个处理管道,有3个阶段:输入→过滤器→输出。输入生成事件,过滤器修改事件,输出将事件发送到其他地方。

所有事件都有属性。例如,一个apache访问日志将包含状态代码(200,404)、请求路径("/","index.html")、HTTP动词(GET, POST)、客户端IP地址等。Logstash将这些属性称为“字段”。

Logstash中的一些配置选项需要存在字段才能运行。因为输入会生成事件,所以在输入块中没有要计算的字段——它们还不存在!

由于它们依赖于事件和字段,下面的配置选项只能在过滤器和输出块中工作。

下面描述的字段引用、sprintf格式和条件式在输入块中不起作用。

Field references

能够通过名称引用字段通常是有用的。为此,可以使用Logstash字段引用语法。

访问字段的基本语法是[fieldname]。如果引用的是顶级字段,可以省略[],只使用字段名。要引用一个嵌套字段,您需要指定该字段的完整路径:[顶级字段] [嵌套字段]。

例如,下面的事件有五个顶级字段(代理、ip、请求、响应、ua)和三个嵌套字段(状态、字节、操作系统)。

{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html",
  "response": {
    "status": 200,
    "bytes": 52353
  },
  "ua": {
    "os": "Windows 7"
  }
}

要引用os字段,你要指定[ua] [os]。要引用一个顶级字段,如request,你可以简单地指定字段名。

更详细的信息,请参见字段引用深度研究。

sprintf format

字段引用格式也用于Logstash所调用的sprintf格式。这种格式使您能够从其他字符串中引用字段值。例如,statsd输出有一个增量设置,使您能够通过状态代码保持apache日志的计数:

output {
  statsd {
    increment => "apache.%{[response][status]}"
  }
}

类似地,您可以将@timestamp字段中的时间戳转换为字符串。不要在花括号中指定字段名,而是使用+FORMAT语法,其中FORMAT是时间格式。

例如,如果您想根据事件的日期、小时和类型字段使用文件输出写入日志:

output {
  file {
    path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}"
  }
}

Conditionals

有时你想过滤或只在某些条件下输出一个事件。为此,你可以使用一个条件。

Logstash中的条件式的外观和作用与编程语言中的相同。条件式支持if、else if和else语句,并且可以嵌套。

条件的语法是:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

一个表达式是什么?比较测试、布尔逻辑等等!

你可以使用以下比较操作符:

  • equality: ==, !=, <, >, <=, >=
  • regexp: =~, !~ (checks a pattern on the right against a string value on the left)
  • inclusion: in, not in

支持的布尔运算符有:

  • and, or, nand, xor

支持的一元操作符有:

  • !

表达式可以很长很复杂。表达式可以包含其他表达式,可以用!来否定表达式,还可以用圆括号(...)来分组。

例如,下面的条件使用mutate过滤器,如果字段动作的值为login,则删除字段secret。

filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}

你可以在一个条件中指定多个表达式:

output {
  # Send production errors to pagerduty
  if [loglevel] == "ERROR" and [deployment] == "production" {
    pagerduty {
    ...
    }
  }
}

你可以使用in操作符来测试一个字段是否包含一个特定的字符串、键或列表元素。请注意,in的语义可以根据目标类型而变化。例如,当应用于一个字符串时,in的意思是 "是一个子串"。当应用于一个集合类型时,in意味着 "集合包含确切的值"。

filter {
  if [foo] in [foobar] {
    mutate { add_tag => "field in field" }
  }
  if [foo] in "foo" {
    mutate { add_tag => "field in string" }
  }
  if "hello" in [greeting] {
    mutate { add_tag => "string in field" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
  if [missing] in [alsomissing] {
    mutate { add_tag => "shouldnotexist" }
  }
  if !("foo" in ["hello", "world"]) {
    mutate { add_tag => "shouldexist" }
  }
}

在条件句中使用not也是一样的。例如,当grok成功时,你可以使用not in只将事件路由到Elasticsearch:

output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch { ... }
  }
}

你可以检查一个特定字段的存在,但目前没有办法区分一个不存在的字段和一个简单的假字段。表达式if [foo] 在以下情况下返回false。

  • [foo] doesn’t exist in the event,
  • [foo] exists in the event, but is false, or
  • [foo] exists in the event, but is null

关于更复杂的例子,请看使用条件语句

目前不支持条件式中的Sprintf日期/时间格式。有一个使用@metadata字段的变通方法。更多的细节和例子请参见条件式中的sprintf日期/时间格式

The @metadata field

在Logstash中,有一个叫做@metadata的特殊字段。在输出时,@metadata的内容并不是你的任何事件的一部分,这使得它非常适合用于条件反射,或者用字段引用和sprintf格式化来扩展和构建事件字段。

这个配置文件从STDIN产生事件。无论你输入什么都会成为事件中的消息字段。过滤器块中的mutate事件增加了一些字段,一些嵌套在@metadata字段中。

input { stdin { } }

filter {
  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if [@metadata][test] == "Hello" {
    stdout { codec => rubydebug }
  }
}

让我们看看会有什么结果:

$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
    "@timestamp" => 2016-06-30T02:42:51.496Z,
      "@version" => "1",
          "host" => "example.com",
          "show" => "This data will be in the output",
       "message" => "asdf"
}

输入的“asdf”成为消息字段的内容,条件成功地评估嵌套在@metadata字段中的测试字段的内容。但是输出没有显示一个名为@metadata的字段或其内容。

rubydebug编解码器允许你显示@metadata字段的内容,如果你添加一个配置指令,metadata => true:

stdout { codec => rubydebug { metadata => true } }

让我们看看这个变化后的输出是什么样子:

$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
    "@timestamp" => 2016-06-30T02:46:48.565Z,
     "@metadata" => {
           "test" => "Hello",
        "no_show" => "This data will not be in the output"
    },
      "@version" => "1",
          "host" => "example.com",
          "show" => "This data will be in the output",
       "message" => "asdf"
}

现在您可以看到@metadata字段及其子字段。

只有rubydebug编解码器允许你显示@metadata字段的内容。

当你需要一个临时字段但又不希望它出现在最终输出中时,就可以使用@metadata字段。

也许这个新字段最常见的使用情况之一是与日期过滤器和有一个临时的时间戳。

这个配置文件已被简化,但使用Apache和Nginx网络服务器常用的时间戳格式。在过去,你必须自己删除时间戳字段,在使用它覆盖@timestamp字段后。有了@metadata字段,这就不再需要了。

input { stdin { } }

filter {
  grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
  date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
  stdout { codec => rubydebug }
}

注意,这个配置将提取的日期放入grok过滤器的[@metadata] [timestamp]字段中。让我们给这个配置提供一个样本日期字符串,看看会有什么结果。

$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100
{
    "@timestamp" => 2014-03-02T14:36:43.000Z,
      "@version" => "1",
          "host" => "example.com",
       "message" => "02/Mar/2014:15:36:43 +0100"
}

这就是了! 在输出中没有多余的字段,而且配置文件更简洁,因为你不必在日期过滤器中转换后删除 "时间戳 "字段。

另一个用例是CouchDB变化输入插件。这个插件自动捕捉CouchDB文档字段元数据,并将其放入输入插件本身的@metadata字段中。当事件通过被Elasticsearch索引时,Elasticsearch输出插件允许你指定动作(删除、更新、插入等)和document_id,像这样。

output {
  elasticsearch {
    action => "%{[@metadata][action]}"
    document_id => "%{[@metadata][_id]}"
    hosts => ["example.com"]
    index => "index_name"
    protocol => "http"
  }
}

sprintf data/time formate in conditionals

目前不支持条件中的Sprintf日期/时间格式,但有一个变通方法。把日期计算放在一个字段中,这样你就可以在条件中使用字段引用。

例子:

直接使用sprintf时间格式来添加一个基于摄取时间的字段是行不通的。

----------
# non-working example
filter{
  if "%{+HH}:%{+mm}" < "16:30" {
    mutate {
      add_field => { "string_compare" => "%{+HH}:%{+mm} is before 16:30" }
    }
  }
}
----------

这个变通方法给你带来了预期的结果:

filter {
  mutate{
     add_field => {
      "[@metadata][time]" => "%{+HH}:%{+mm}"
     }
  }
  if [@metadata][time] < "16:30" {
    mutate {
      add_field => {
        "string_compare" => "%{+HH}:%{+mm} is before 16:30"
      }
    }
  }
}

在配置文件中使用环境变量

概述

  • 你可以通过使用$在Logstash插件的配置中设置环境变量引用。
  • 在Logstash启动时,每个引用将被环境变量的值所取代。
  • 该替换是区分大小写的。
  • 对未定义变量的引用会引发Logstash配置错误。
  • 你可以通过使用$的形式给出一个默认值。如果环境变量是未定义的,Logstash会使用默认值。
  • 你可以在任何插件选项类型中添加环境变量引用:字符串、数字、布尔值、数组或哈希。
  • 环境变量是不可改变的。如果你更新了环境变量,你将不得不重新启动Logstash以获取更新的值。

环境变量引用只支持在插件配置中,不支持在条件中。解决这个限制的一个方法是添加一个带有环境变量值的新事件字段,并在条件中使用该新字段。

例子

下面的例子告诉你如何使用环境变量来设置一些常用的配置选项的值。

设置TCP端口

下面是一个使用环境变量设置TCP端口的示例:

input {
  tcp {
    port => "${TCP_PORT}"
  }
}

现在让我们设置TCP_PORT的值:

export TCP_PORT=12345

在启动时,Logstash使用以下配置:

input {
  tcp {
    port => 12345
  }
}

如果没有设置TCP_PORT环境变量,Logstash将返回一个配置错误。

你可以通过指定一个默认值来解决这个问题:

input {
  tcp {
    port => "${TCP_PORT:54321}"
  }
}

现在,如果变量未定义,Logstash不会返回一个配置错误,而是使用默认值:

input {
  tcp {
    port => 54321
  }
}

如果定义了环境变量,Logstash将使用为变量指定的值而不是默认值。

设置标签的值

下面是一个使用环境变量来设置标签值的例子:

filter {
  mutate {
    add_tag => [ "tag1", "${ENV_TAG}" ]
  }
}

让我们设置ENV_TAG的值:

export ENV_TAG="tag2"

在启动时,Logstash使用以下配置:

filter {
  mutate {
    add_tag => [ "tag1", "tag2" ]
  }
}
设置文件路径

下面是一个使用环境变量来设置日志文件路径的例子:

filter {
  mutate {
    add_field => {
      "my_path" => "${HOME}/file.log"
    }
  }
}

让我们设置HOME的值:

export HOME="/path"

在启动时,Logstash使用以下配置:

filter {
  mutate {
    add_field => {
      "my_path" => "/path/file.log"
    }
  }
}

Logstach配置案例

这些示例说明了如何配置Logstash来过滤事件、处理Apache日志和syslog消息,并使用条件来控制过滤器或输出所处理的事件。

如果您需要帮助构建grok模式,请尝试grok调试器。Grok调试器是基于基本许可证的X-Pack特性,因此可以免费使用。

配置过滤

过滤器是一种内联处理机制,可以灵活地对数据进行切片,以满足您的需求。让我们来看看一些过滤器的作用。下面的配置文件设置grok和date过滤器。

input { stdin { } }

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

运行Logstash配置如下:

bin/logstash -f logstash-filter.conf

现在,将以下行粘贴到终端并按回车键,以便它将由stdin输入处理:

127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"

你应该看到一些返回到标准输出的东西,看起来像这样:

{
        "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
     "@timestamp" => "2013-12-11T08:01:45.000Z",
       "@version" => "1",
           "host" => "cadenza",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "11/Dec/2013:00:01:45 -0800",
           "verb" => "GET",
        "request" => "/xampp/status.php",
    "httpversion" => "1.1",
       "response" => "200",
          "bytes" => "3891",
       "referrer" => "\"http://cadenza/xampp/navi.php\"",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}

如您所见,Logstash(在grok过滤器的帮助下)能够解析日志行(它碰巧是Apache的“组合日志”格式),并将其分解为许多不同的离散信息位。一旦您开始查询和分析日志数据,这将非常有用。例如,您将能够轻松地运行关于HTTP响应代码、IP地址、引用者等的报告。Logstash中包含了许多现成的grok模式,因此,如果您需要解析一种常见的日志格式,很可能已经有人为您完成了这些工作。有关更多信息,请参阅GitHub上的Logstash grok模式列表

本例中使用的另一个过滤器是日期过滤器。这个过滤器解析出一个时间戳,并将其用作事件的时间戳(无论您在什么时候接收日志数据)。您会注意到,本例中的@timestamp字段被设置为2013年12月11日,即使Logstash在之后的某个时间点接收事件。当回填日志时,这很方便。它使您能够告诉Logstash“使用此值作为此事件的时间戳”。

处理Apache日志

让我们做一些实际有用的事情:处理apache2访问日志文件!我们将从本地主机上的文件中读取输入,并根据需要使用条件来处理事件。首先,创建一个类似logstash-apache.conf的文件,包含以下内容(你可以根据需要更改日志的文件路径):

input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

然后,用以下日志条目创建上面配置的输入文件(在这个例子中,"/tmp/access_log")(或者从你自己的web服务器上使用一些):

71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"

现在,运行带有-f指令的Logstash以传递到配置文件中:

bin/logstash -f logstash-apache.conf

现在你应该在Elasticsearch中看到你的apache日志数据了! Logstash打开并读取指定的输入文件,处理它遇到的每个事件。任何记录在该文件中的额外行也将被捕获,由Logstash作为事件处理,并存储在Elasticsearch中。作为额外的奖励,它们在存储时,字段 "type "被设置为 "apache_access"(这是由输入配置中的type⇒"apache_access "行完成的)。

在这个配置中,Logstash只监视apache access_log,但要同时监视access_log和error_log(实际上是任何与*log匹配的文件)是很容易的,只要改变上述配置中的一行即可:

input {
  file {
    path => "/tmp/*_log"
...

当您重新启动Logstash时,它将同时处理错误和访问日志。但是,如果您检查数据(可能使用elasticsearch-kopf),您将看到access_log被分割成离散的字段,而error_log则没有。这是因为我们使用了grok过滤器来匹配标准的组合apache日志格式,并自动将数据分割为单独的字段。如果我们可以根据它的格式来控制一行的解析方式,那不是很好吗?嗯,我们可以…

注意,Logstash没有重新处理在access_log文件中已经看到的事件。当从文件中读取数据时,Logstash会保存它的位置,并且只在添加新行时处理它们。整洁!

使用条件

您可以使用条件来控制筛选器或输出处理哪些事件。例如,可以根据事件出现在哪个文件(access_log、error_log和其他以“log”结尾的随机文件)对每个事件进行标记。

input {
  file {
    path => "/tmp/*_log"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else if [path] =~ "error" {
    mutate { replace => { type => "apache_error" } }
  } else {
    mutate { replace => { type => "random_logs" } }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

这个示例使用type字段标记所有事件,但实际上并没有解析错误或随机文件。有很多类型的错误日志,它们应该如何标记取决于你使用的是什么日志。

类似地,您可以使用条件将事件指向特定的输出。例如,你可以:

  • 提醒nagios任何状态为5xx的apache事件
  • 记录任何4xx状态到Elasticsearch
  • 通过statsd记录所有状态代码

要告诉nagios任何具有5xx状态码的http事件,首先需要检查type字段的值。如果是apache,那么你可以检查status字段是否包含一个5xx错误。如果是,将其发送到nagios。如果不是5xx错误,检查状态字段是否包含4xx错误。如果是,将其发送到Elasticsearch。最后,不管status字段包含什么,将所有apache状态代码发送到statsd:

output {
  if [type] == "apache" {
    if [status] =~ /^5\d\d/ {
      nagios { ...  }
    } else if [status] =~ /^4\d\d/ {
      elasticsearch { ... }
    }
    statsd { increment => "apache.%{status}" }
  }
}

处理系统日志

Syslog是Logstash最常见的用例之一,而且它处理得非常好(只要日志行大致符合RFC3164的要求)。Syslog是事实上的UNIX网络日志标准,从客户机发送消息到本地文件,或者通过rsyslog发送到集中的日志服务器。在这个例子中,你不需要一个有效的syslog实例;我们将从命令行中伪造它,这样你就能感觉到会发生什么。

首先,让我们为Logstash + syslog制作一个简单的配置文件,称为logstash-syslog.conf。

input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

通常,客户机将连接到端口5000上的Logstash实例并发送其消息。对于本例,我们将telnet到Logstash并输入日志行(类似于我们之前在STDIN中输入日志行的方式)。打开另一个shell窗口与Logstash syslog输入进行交互,并输入以下命令:

telnet localhost 5000

复制并粘贴下面的行作为示例。(您可以自己尝试一些,但请记住,如果grok过滤器对您的数据不正确,它们可能无法解析)。

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
Dec 9 12:11:43 test main[31499]: xxxx

现在您应该看到Logstash在处理和解析消息时的原始shell中的输出!

{
                 "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
              "@timestamp" => "2013-12-23T22:30:01.000Z",
                "@version" => "1",
                    "type" => "syslog",
                    "host" => "0:0:0:0:0:0:0:1:52617",
        "syslog_timestamp" => "Dec 23 14:30:01",
         "syslog_hostname" => "louis",
          "syslog_program" => "CRON",
              "syslog_pid" => "619",
          "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
             "received_at" => "2013-12-23 22:49:22 UTC",
           "received_from" => "0:0:0:0:0:0:0:1:52617",
    "syslog_severity_code" => 5,
    "syslog_facility_code" => 1,
         "syslog_facility" => "user-level",
         "syslog_severity" => "notice"
}

安全配置

Logstash Elasticsearch插件(输出、输入、过滤和监控)支持通过HTTPS进行身份验证和加密。

要在安全集群中使用Logstash,您需要为Logstash配置身份验证凭据。如果身份验证失败,Logstash将抛出一个异常,处理管道将停止。

如果集群中启用了加密,还需要在Logstash配置中启用TLS/SSL。

如果希望使用X-Pack监控Logstash实例,并将监控数据存储在安全的Elasticsearch集群中,则必须为具有适当权限的用户配置Logstash的用户名和密码。

除了为Logstash配置身份验证凭据之外,还需要授予授权用户访问Logstash索引的权限。

配置Logstash使用Basic Authentication

Logstash需要能够管理索引模板,创建索引,以及在它创建的索引中写入和删除文档。

为Logstash设置身份验证凭据:

  1. 使用Kibana的Management > Roles UI或角色API创建一个logstash_writer角色。对于集群权限,添加manage_index_templates和monitor。对于索引添加权限write, create, and create_index

  2. 在kibana的管理端中,添加一个logstash_internel用户,关联logstash_writer角色

  3. 将Logstash配置为您刚才创建的logstash_internal用户进行身份验证。您可以在Logstash.conf文件中为每个Elasticsearch插件分别配置凭据。例如:

    input {
      elasticsearch {
        ...
        user => logstash_internal
        password => x-pack-test-password
      }
    }
    filter {
      elasticsearch {
        ...
        user => logstash_internal
        password => x-pack-test-password
      }
    }
    output {
      elasticsearch {
        ...
        user => logstash_internal
        password => x-pack-test-password
      }
    }
    

发送数据到Elasticsearch

Elasticsearch输出插件可以在Elasticsearch中存储时间序列数据集(如日志、事件和指标)和非时间序列数据。

建议使用数据流选项将时间序列数据集(如日志、指标和事件)索引到Elasticsearch中:

Data stream 配置案例

示例:基本默认配置

output {
    elasticsearch {
        hosts => "hostname"
        data_stream => "true"
    }
}

这个例子展示了处理数据流的最小设置。与data_stream.* 事件字段被路由到相应的数据流。如果字段缺失,则将缺省路由到logs-generic-logstash。

例如:自定义数据流名称

output {
    elasticsearch {
        hosts => "hostname"
        data_stream => "true"
        data_stream_type => "metrics"
        data_stream_dataset => "foo"
        data_stream_namespace => "bar"
    }
}

写入不同的索引:最佳实践

如果你发送事件到相同的Elasticsearch集群,但是你的目标是不同的索引,你可以:

  • 使用不同的Elasticsearch输出,每个输出都有不同的索引参数值

  • 使用一个Elasticsearch输出,并使用动态变量替换索引参数

每个Elasticsearch输出都是一个连接到集群的新客户端:

  • 它必须初始化客户端并连接到Elasticsearch(如果你有更多的客户端,重启时间会更长)

  • 它有一个关联的连接池

为了最小化打开到Elasticsearch的连接数量,最大化批量大小并减少“小”批量请求的数量(这很容易填满队列),通常使用单个Elasticsearch输出更有效。

例子:

    output {
      elasticsearch {
        index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}"
      }
    }

如果事件中没有包含目标索引前缀的字段,该怎么办?

可以使用mutate筛选器和条件添加[@metadata]字段来为每个事件设置目标索引。[@metadata]字段将不会被发送到Elasticsearch。

例子:

  filter {
      if [log_type] in [ "test", "staging" ] {
        mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
      } else if [log_type] == "production" {
        mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
      } else {
        mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
      }
    }
    output {
      elasticsearch {
        index => "%{[@metadata][target_index]}"
      }
    }

一个实际使用中的样例:

input {
  file {
    path => "/tmp/b_log"
    start_position => "beginning"
  }
} 
filter {
 grok {
    patterns_dir => ["/opt/server/logstash-7.11.1/extrapattern"]
    match => { "message" => "%{LOGDATESTAMP:[@metadata][timestamp]}.*%{KQMARK:kqmark}\s*%{JAVALOGMESSAGE:javamessage}" }
  }
  date {
  	match => [ "[@metadata][timestamp]", "yyyy-MM-dd HH:mm:ss.SSS" ]
  	timezone => "Asia/Shanghai"
  	locale => "zh-CN"
  }
  json {
  	source => "javamessage"
  	remove_field => [ "javamessage" ]
  }
}
output {
  if [kqmark] == "kqlog" {
    elasticsearch {
      hosts => ["127.0.0.1:9200","127.0.0.1:9300","127.0.0.1:9400"]
      index => "logstash-%{+YYYY.MM.dd}"
      user => "logstash_internal"
      password => "123456"
    }
  }
}