当前位置: 首页 > news >正文

一等一网站参考消息网

一等一网站,参考消息网,做网站 做什么网站好,晓风彩票网站建设源代码授权目录 1、日志对我们来说到底重不重要#xff1f; 2、常见的日志收集方案 2.1 EFK 2.2 ELK Stack 2.3 ELKfilebeat 2.4 其他方案 2、elasticsearch组件介绍 3、filebeat组件介绍 3.1 filebeat和beat关系 3.2 filebeat是什么#xff1f; 3.3 Filebeat工作原理 3.4 …目录 1、日志对我们来说到底重不重要 2、常见的日志收集方案 2.1 EFK 2.2 ELK Stack 2.3 ELKfilebeat 2.4 其他方案 2、elasticsearch组件介绍 3、filebeat组件介绍 3.1 filebeat和beat关系 3.2 filebeat是什么 3.3 Filebeat工作原理 3.4 传输方案 4、logstash组件介绍 4.1 Logstash工作原理 5、fluentd组件介绍 6、fluentd、filebeat、logstash对比分析 7、安装elasticsearch组件 7.1 创建名称空间 7.2 查看kube-logging名称空间是否创建成功 7.3 安装elasticsearch组件 7.3.1 创建headless service服务 7.3.2 通过statefulset创建elasticsearch集群 7.3.2.1 创建Storageclass实现存储类动态供给 7.3.2.2 安装elasticsearch集群 8、安装kibana可视化UI界面 9、安装fluentd组件 10、测试收集pod容器日志 11、基于EFKlogstashkafka构建高吞吐量的日志收集平台 1、部署fluentd 2、接入kafka 3、配置logstash 4、启动logstash 文档中的YAML文件配置直接复制粘贴可能存在格式错误故实验中所需要的YAML文件以及本地包均打包至网盘 链接https://pan.baidu.com/s/15Ryaoa0_9ABQElLw9y28DA  提取码xdbm  实验环境 EFK安装在k8s集群k8s环境如下 k8s集群        k8s的控制节点 ip192.168.40.180 主机名xianchaomaster1 配置6vCPU/6Gi内存 k8s的工作节点 ip192.168.40.181 主机名xianchaonode1 配置6vCPU/8Gi内存 1、日志对我们来说到底重不重要 在生产环境或者测试环境如果某个服务或者业务组件出现问题如何定位和排查需要靠日志日志是定位问题的重要手段就像办案人员要根据现场留下的线索推断案情一样。 监控、日志企业必须具备的 日志打印的常见级别 日志打印通常有四种级别从高到底分别是ERROR、WARN、INFO、DEBUG。应该选用哪种级别是个很重要的问题。 日志级别中的优先级是什么意思在你的系统中如果开启了某一级别的日志后就不会打印比它级别低的日志。例如程序如果开启了INFO级别日志DEBUG日志就不会打印通常在生产环境中开启INFO日志。 1、DEBUG DEBU可以打印出最详细的日志信息主要用于开发过程中打印一些运行信息。 2、INFO INFO可以打印一些你感兴趣的或者重要的信息这个可以用于生产环境中输出程序运行的一些重要信息但是不能滥用避免打印过多的日志。 3、WARNING WARNING 表明发生了一些暂时不影响运行的错误会出现潜在错误的情形有些信息不是错误信息但是也要给程序员的一些提示  4、ERROR ERROR 可以打印错误和异常信息如果不想输出太多的日志可以使用这个级别这一级就是比较重要的错误了软件的某些功能已经不能继续执行了。 那么应该打印什么级别的日志呢?首先我们应该明确谁在看日志。 通常来说系统出了问题客户不会进到系统对着控制台查看日志输出所以日志所面对的主体对象必然是软件开发人员(包括测试测试、维护人员)。 下面我们假设几种场景来帮助我们理解日志级别 程序开发结束后交由给测试人员进行测试测试人员根据测试用例发现某个用例的输出和预期不符此时他的一反应该是查看日志。此时的日志是INFO级别日志不会出现DEBUG级别的日志现在就需要根据日志打印分为两种情况决定他下一步操作 通过查看INFO日志发现是由于自己操作失误造成了程序结果和预期不符合这种情况不是程序出错所以并不是bug不需要开发人员到场。 通过查看INFO日志发现自己的操作正确根据INFO日志查看并不是操作失误造成这个时候就需要开发人员到场确认。 以上两种情况是理想情况测试人员仅根据INFO级别的日志就能判断出程序的输出结果与预期不符是因为自己操作失误还是程序bug。更为现实的情况实际是测试人员并不能根据INFO级别的日志判断是否是自己失误还是程序bug。 综上INFO级别的日志应该是能帮助测试人员判断这是否是一个真正的bug而不是自己操作失误造成的。假设测试人员现在已经初步判断这是一个bug并且这个bug不那么明显此时就需要开发人员到场确认。开发人员到达现场后第一步应该是查看INFO日志初步作判断验证测试人员的看法接着如果不能判断出问题所在则应该是将日志级别调整至DEBUG级别打印出DEBUG级别的日志通过DEBUG日志来分析定位bug出在哪里。所以DEBUG级别的日志应该是能帮助开发人员分析定位bug所在的位置。 ERROR和WARN的级别都比INFO要高所以在设定日志级别在INFO时这两者的日志也会被打印。根据上面INFO和DEBUG级别的区别以及适用人员可以知道ERROR和WARN是同时给测试和开发、运维观察的。WARN级别称之为“警告”这个“警告”实际上就有点含糊了它不算错你可以选择忽视它但也可以选择重视它。例如现在一个WARN日志打出这么一条日志“系统有崩溃的风险”这个时候就需要引起足够的重视它代表现在不会崩溃但是它有崩溃的风险。或者出现“某用户在短时间内将密码输出很多次过后才进入了系统”这个时候是不是系统被暴力破解了呢?等等这个级别日志如同它的字面含义给你一个警告你可以选择忽视也可以重视但至少它现在不会给系统带来其他影响。 ERROR级别称之为“错误”这个含义就更明显了就是系统出现了错误需要处理。为常见的就是捕获异常时所打印的日志。 2、常见的日志收集方案 常见的进行日志分析的方法直接在日志文件中grep、awk就可以获得自己想要的信息。但在规模较大的场景中此方法效率低下面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理把所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统将所有节点上的日志统一收集管理访问。 大型系统是一个分布式部署的架构不同的服务模块部署在不同的服务器上问题出现时大部分情况需要根据问题暴露的关键信息定位到具体的服务器和服务模块构建一套集中式日志系统可以提高定位问题的效率。 对大量的日志业务数据进行分析如平台PV、UV、IP、PageTOP等维度进行分析查询等。另外安全审计、数据挖掘、行为分析等都少不了日志对其作为支撑。 2.1 EFK 在Kubernetes集群上运行多个服务和应用程序时日志收集系统可以帮助你快速分类和分析由Pod生成的大量日志数据。Kubernetes中比较流行的日志收集解决方案是Elasticsearch、Fluentd和KibanaEFK技术栈也是官方推荐的一种方案。 Elasticsearch是一个实时的分布式的可扩展的搜索引擎它允许进行全文本和结构化搜索以及对日志进行分析。它通常用于索引和搜索大量日志数据也可以用于搜索许多不同种类的文档。 Elasticsearch通常与Kibana一起部署kibana可以把Elasticsearch采集到的数据通过dashboard仪表板可视化展示出来。Kibana允许你通过Web界面浏览Elasticsearch日志数据也可自定义查询条件快速检索出elasticccsearch中的日志数据。 Fluentd是一个流行的开源数据收集器我们在 Kubernetes 集群节点上安装 Fluentd通过获取容器日志文件、过滤和转换日志数据然后将数据传递到 Elasticsearch 集群在该集群中对其进行索引和存储。 2.2 ELK Stack E - Elasticsearch简称ES L - Logstash K - Kibana Elasticsearch日志存储和搜索引擎它的特点有分布式零配置自动发现索引自动分片索引副本机制restful风格接口多数据源自动搜索负载等。 Logstash是一个完全开源的工具他可以对你的日志进行收集、过滤并将其存储供以后使用支持动态的从各种数据源搜集数据并对数据进行过滤、分析、丰富、统一格式等操作。。 Kibana 也是一个开源和免费的工具Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面可以帮助您汇总、分析和搜索重要数据日志。 流程 应用程序AppServer–Logstash–ElasticSearch–Kibana–浏览器Browser Logstash收集AppServer产生的Log并存放到ElasticSearch集群中而Kibana则从ElasticSearch集群中查询数据生成图表再返回给Browser。 考虑到聚合端日志处理、清洗等负载问题和采集端传输效率一般在日志量比较大的时候在采集端和聚合端增加队列以用来实现日志消峰。 2.3 ELKfilebeat Filebeat采集— Logstash聚合、处理— ElasticSearch 存储—Kibana 展示 2.4 其他方案 ELK日志流程可以有多种方案不同组件可自由组合根据自身业务配置常见有以下 Logstash采集、处理— ElasticSearch 存储—Kibana 展示 Logstash采集— Logstash聚合、处理— ElasticSearch 存储—Kibana 展示 Filebeat采集、处理— ElasticSearch 存储—Kibana 展示 Filebeat采集— Logstash聚合、处理— ElasticSearch 存储—Kibana 展示 Filebeat采集— Kafka/Redis(消峰) — Logstash聚合、处理— ElasticSearch 存储—Kibana 展示 2、elasticsearch组件介绍 Elasticsearch 是一个分布式的免费开源搜索和分析引擎适用于包括文本、数字、地理空间、结构化和非结构化数据等在内的所有类型的数据。Elasticsearch 在 Apache Lucene 的基础上开发而成由 Elasticsearch N.V.即现在的 Elastic于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名是 Elastic Stack 的核心组件Elastic Stack 是一套适用于数据采集、扩充、存储、分析和可视化的免费开源工具。人们通常将 Elastic Stack 称为 ELK Stack代指 Elasticsearch、Logstash 和 Kibana目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理这些代理统称为 Beats可用来向 Elasticsearch 发送数据。 3、filebeat组件介绍 3.1 filebeat和beat关系 filebeat是Beats中的一员。Beats是一个轻量级日志采集器Beats家族有6个成员早期的ELK架构中使用Logstash收集、解析日志但是Logstash对内存、cpu、io等资源消耗比较高。相比LogstashBeats所占系统的CPU和内存几乎可以忽略不计。 目前Beats包含六种工具 1、Packetbeat网络数据收集网络流量数据 2、Metricbeat指标收集系统、进程和文件系统级别的CPU和内存使用情况等数据 3、Filebeat日志文件收集文件数据 4、Winlogbeatwindows事件日志收集Windows事件日志数据 5、Auditbeat审计数据收集审计日志 6、Heartbeat运行时间监控收集系统运行时的数据 3.2 filebeat是什么 Filebeat是用于转发和收集日志数据的轻量级传送工具。Filebeat监视你指定的日志文件或位置收集日志事件并将它们转发到Elasticsearch或 Logstash中。 Filebeat的工作方式如下启动Filebeat时它将启动一个或多个输入这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容并将新日志数据发送到libbeatlibbeat将聚集事件并将聚集的数据发送到为Filebeat配置的输出。 工作的流程图如下 Filebeat 有两个主要组件 harvester一个harvester负责读取一个单个文件的内容。harvester逐行读取每个文件并把这些内容发送到输出。每个文件启动一个harvester。 Input一个input负责管理harvesters并找到所有要读取的源。如果input类型是log则input查找驱动器上与已定义的log日志路径匹配的所有文件并为每个文件启动一个harvester。 3.3 Filebeat工作原理 在任何环境下应用程序都有停机的可能性。 Filebeat 读取并转发日志行如果中断则会记住所有事件恢复联机状态时所在位置。 Filebeat带有内部模块auditdApacheNginxSystem和MySQL可通过一个指定命令来简化通用日志格式的收集解析和可视化。 FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据当 Logstash 忙于处理数据会通知 FileBeat 放慢读取速度。一旦拥塞得到解决FileBeat将恢复到原来的速度并继续传播。 Filebeat保持每个文件的状态并经常刷新注册表文件中的磁盘状态。状态用于记住harvester正在读取的最后偏移量并确保发送所有日志行。Filebeat将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出没有数据丢失。 3.4 传输方案 1、output.elasticsearch 如果你希望使用 filebeat 直接向 elasticsearch 输出数据需要配置 output.elasticsearch output.elasticsearch: hosts: [192.168.40.180:9200] 2、output.logstash 如果使用filebeat向 logstash输出数据然后由 logstash 再向elasticsearch 输出数据需要配置 output.logstash。 logstash 和 filebeat 一起工作时如果 logstash 忙于处理数据会通知FileBeat放慢读取速度。一旦拥塞得到解决FileBeat 将恢复到原来的速度并继续传播。这样可以减少管道超负荷的情况。 output.logstash: hosts: [192.168.40.180:5044]   3、output.kafka 如果使用filebeat向kafka输出数据然后由 logstash 作为消费者拉取kafka中的日志并再向elasticsearch 输出数据需要配置 output.logstash output.kafka: enabled: true hosts: [192.168.40.180:9092] topic: elfk8stest 4、logstash组件介绍 Logstash是一个开源数据收集引擎具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来并将数据标准化到你所选择的目的地。Logstash 是一个应用程序日志、事件的传输、处理、管理和搜索的平台。你可以用它来统一对应用程序日志进行收集管理提供 Web 接口用于查询和统计。 输入采集各种样式、大小和来源的数据 数据往往以各种各样的形式或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式轻松地从你的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。 过滤器实时解析和转换数据 数据从源传输到存储库的过程中Logstash 过滤器能够解析各个事件识别已命名的字段以构建结构并将它们转换成通用格式以便更轻松、更快速地分析和实现商业价值。 Logstash 能够动态地转换和解析数据不受格式或复杂度的影响 1、利用 Grok 从非结构化数据中派生出结构 2、从 IP 地址破译出地理坐标 3、将 PII 数据匿名化完全排除敏感字段 4、整体处理不受数据源、格式或架构的影响 输出选择你的存储导出你的数据 尽管 Elasticsearch 是我们的首选输出方向能够为我们的搜索和分析带来无限可能但它并非唯一选择。Logstash 提供众多输出选择你可以将数据发送到你要指定的地方。  4.1 Logstash工作原理 Logstash 有两个必要元素input 和 output 一个可选元素filter。 这三个元素分别代表 Logstash 事件处理的三个阶段输入 过滤器 输出 Input负责从数据源采集数据。 filter 将数据修改为你指定的格式或内容。 output 将数据传输到目的地。 在实际应用场景中通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式可以根据应用需要灵活的选用各阶段需要的插件并组合使用。 常用input模块 Logstash 支持各种输入选择 可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式可从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。 file从文件系统上的文件读取 syslog在众所周知的端口514上侦听系统日志消息并根据RFC3164格式进行解析 redis从redis服务器读取使用redis通道和redis列表。 Redis经常用作集中式Logstash安装中的“代理”它将接收来自远程Logstash“托运人”的Logstash事件排队。 beats处理由Filebeat发送的事件。 常用的filter模块 过滤器是Logstash管道中的中间处理设备。可以将条件过滤器组合在一起对事件执行操作。 grok解析和结构任意文本。 Grok目前是Logstash中将非结构化日志数据解析为结构化和可查询的最佳方法。 mutate对事件字段执行一般转换。可以重命名删除替换和修改事件中的字段。 drop完全放弃一个事件例如调试事件。 clone制作一个事件的副本可能会添加或删除字段。 geoip添加有关IP地址的地理位置的信息 常用output elasticsearch将事件数据发送给 Elasticsearch推荐模式。 file将事件数据写入文件或磁盘。 graphite将事件数据发送给 graphite一个流行的开源工具存储和绘制指标 http://graphite.readthedocs.io/en/latest/。 statsd将事件数据发送到 statsd 这是一种侦听统计数据的服务如计数器和定时器通过UDP发送并将聚合发送到一个或多个可插入的后端服务。 常用code插件 json以JSON格式对数据进行编码或解码。 multiline将多行文本事件如java异常和堆栈跟踪消息合并为单个事件。 input { kafka { bootstrap_servers 192.168.40.180:9092 auto_offset_reset latest consumer_threads 5 decorate_events true topics [elktest] } } output { elasticsearch { hosts [192.168.40.180:9200] index elkk8stest-%{YYYY.MM.dd} } }         5、fluentd组件介绍 fluentd是一个针对日志的收集、处理、转发系统。通过丰富的插件系统可以收集来自于各种系统或应用的日志转化为用户指定的格式后转发到用户所指定的日志存储系统之中。 fluentd 常常被拿来和Logstash比较我们常说ELKL就是这个agent。fluentd 是随着Docker和es一起流行起来的agent。 fluentd 比 logstash 更省资源更轻量级的 fluent-bid 对应 filebeat作为部署在结点上的日志收集器fluentd 有更多强大、开放的插件数量和社区。插件多也非常灵活规则也不复杂。 6、fluentd、filebeat、logstash对比分析 常见的日志采集工具有Logstash、Filebeat、Fluentd、Logagent、rsyslog等等那么他们之间有什么区别呢?什么情况下我们应该用哪一种工具? Logstash Logstash是一个开源数据收集引擎具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来并将数据标准化到你所选择的目的地。 优势 Logstash 主要的优点就是它的灵活性主要因为它有很多插件详细的文档以及直白的配置格式让它可以在多种场景下应用。我们基本上可以在网上找到很多资源几乎可以处理任何问题。 劣势 Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升与它的替代者们相比还是要慢很多的。这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。 另一个问题是它目前不支持缓存目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池 典型应用场景 因为 Logstash 自身的灵活性以及网络上丰富的资料Logstash 适用于原型验证阶段使用或者解析非常的复杂的时候。在不考虑服务器资源的情况下如果服务器的性能足够好我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲因为文件自身就有缓冲的行为而 Logstash 也会记住上次处理的位置。 如果服务器性能较差并不推荐为每个服务器安装 Logstash 这样就需要一个轻量的日志传输工具将数据从服务器端经由一个或多个 Logstash 中心服务器传输到 Elasticsearch 随着日志项目的推进可能会因为性能或代价的问题需要调整日志传输的方式(log shipper)。当判断 Logstash 的性能是否足够好时重要的是对吞吐量的需求有着准确的估计这也决定了需要为 Logstash 投入多少硬件资源。 Filebeat 作为 Beats 家族的一员Filebeat 是一个轻量级的日志传输工具它的存在正弥补了 Logstash 的缺点Filebeat 作为一个轻量级的日志传输工具可以将日志推送到中心 Logstash。 在版本 5.x 中Elasticsearch 具有解析的能力(像 Logstash 过滤器)— Ingest。这也就意味着可以将数据直接用 Filebeat 推送到 Elasticsearch并让 Elasticsearch 既做解析的事情又做存储的事情。也不需要使用缓冲因为 Filebeat 也会和 Logstash 一样记住上次读取的偏移如果需要缓冲(例如不希望将日志服务器的文件系统填满)可以使用 Redis/Kafka因为 Filebeat 可以与它们进行通信。 优势 Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少尽管它还十分年轻正式因为它简单所以几乎没有什么可以出错的地方所以它的可靠性还是很高的。它也为我们提供了很多可以调节的点例如它以何种方式搜索新的文件以及当文件有一段时间没有发生变化时何时选择关闭文件句柄。 劣势 Filebeat 的应用范围十分有限所以在某些场景下我们会碰到问题。例如如果使用 Logstash 作为下游管道我们同样会遇到性能问题。正因为如此Filebeat 的范围在扩大。开始时它只能将日志发送到 Logstash 和 Elasticsearch而现在它可以将日志发送给 Kafka 和 Redis在 5.x 版本中它还具备过滤的能力。 Fluentd Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样它为几乎所有的语言都提供库这也意味着我们可以将它插入到我们自定义的程序中。 优势 和多数 Logstash 插件一样Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。 劣势 因为在多数应用场景下我们会通过 Fluentd 得到结构化的数据它的灵活性并不好。但是我们仍然可以通过正则表达式来解析非结构化的数据。尽管性能在大多数场景下都很好但它并不是最好的和 syslog-ng 一样它的缓冲只存在与输出端单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的不过它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备可能需要看看 Fluent Bit它和 Fluentd 的关系与 Filebeat 和 Logstash 之间的关系类似。 典型应用场景 Fluentd 在日志的数据源和目标存储各种各样时非常合适因为它有很多插件。而且如果大多数数据源都是自定义的应用所以可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多。特别是在我们的应用是多种语言编写的时候即我们使用了多种日志库日志的行为也不太一样。 Logagent Logagent 是 Sematext 提供的传输工具它用来将日志传输到 Logsene(一个基于 SaaS 平台的 Elasticsearch API)因为 Logsene 会暴露 Elasticsearch API所以 Logagent 可以很容易将数据推送到 Elasticsearch 。 优势 可以获取 /var/log 下的所有信息解析各种格式(ElasticsearchSolrMongoDBApache HTTPD等等)它可以掩盖敏感的数据信息例如个人验证信息(PII)出生年月日信用卡号码等等。它还可以基于 IP 做 GeoIP 丰富地理位置信息(例如access logs)。同样它轻量又快速可以将其置入任何日志块中。在新的 2.0 版本中它以第三方 node.js 模块化方式增加了支持对输入输出的处理插件。重要的是 Logagent 有本地缓冲所以不像 Logstash 在数据传输目的地不可用时会丢失日志。 劣势 尽管 Logagent 有些比较有意思的功能(例如接收 Heroku 或 CloudFoundry 日志)但是它并没有 Logstash 灵活。 典型应用场景 Logagent 作为一个可以做所有事情的传输工具是值得选择的(提取、解析、缓冲和传输)。 logtail 阿里云日志服务的生产者目前在阿里集团内部机器上运行经过3年多时间的考验目前为阿里公有云用户提供日志收集服务。 采用C语言实现对稳定性、资源控制、管理等下过很大的功夫性能良好。相比于logstash、fluentd的社区支持logtail功能较为单一专注日志收集功能。 优势 logtail占用机器cpu、内存资源最少结合阿里云日志服务的E2E体验良好。 劣势 logtail目前对特定日志类型解析的支持较弱后续需要把这一块补起来。 rsyslog 绝大多数 Linux 发布版本默认的 syslog 守护进程rsyslog 可以做的不仅仅是将日志从 syslog socket 读取并写入 /var/log/messages 。它可以提取文件、解析、缓冲(磁盘和内存)以及将它们传输到多个目的地包括 Elasticsearch 。可以从此处找到如何处理 Apache 以及系统日志。 优势 rsyslog 是经测试过的最快的传输工具。如果只是将它作为一个简单的 router/shipper 使用几乎所有的机器都会受带宽的限制但是它非常擅长处理解析多个规则。它基于语法的模块(mmnormalize)无论规则数目如何增加它的处理速度始终是线性增长的。这也就意味着如果当规则在 20-30 条时如解析 Cisco 日志时它的性能可以大大超过基于正则式解析的 grok 达到 100 倍(当然这也取决于 grok 的实现以及 liblognorm 的版本)。 它同时也是我们能找到的最轻的解析器当然这也取决于我们配置的缓冲。 劣势 rsyslog 的配置工作需要更大的代价(这里有一些例子)这让两件事情非常困难 文档难以搜索和阅读特别是那些对术语比较陌生的开发者。 5.x 以上的版本格式不太一样(它扩展了 syslogd 的配置格式同时也仍然支持旧的格式)尽管新的格式可以兼容旧格式但是新的特性(例如Elasticsearch 的输出)只在新的配置下才有效然后旧的插件(例如Postgres 输出)只在旧格式下支持。 尽管在配置稳定的情况下rsyslog 是可靠的(它自身也提供多种配置方式最终都可以获得相同的结果)它还是存在一些 bug 。 典型应用场景 rsyslog 适合那些非常轻的应用(应用小VMDocker容器)。如果需要在另一个传输工具(例如Logstash)中进行处理可以直接通过 TCP 转发 JSON 或者连接 Kafka/Redis 缓冲。 rsyslog 还适合我们对性能有着非常严格的要求时特别是在有多个解析规则时。那么这就值得为之投入更多的时间研究它的配置。 重点Logstash、fluentd、filebeat 实验需要的课件说明 下文需要的资源清单yaml文件和离线镜像压缩包都在课件里可上传自己机器上直接使用即可 7、安装elasticsearch组件 7.1 创建名称空间 在安装Elasticsearch集群之前我们先创建一个名称空间在这个名称空间下安装日志收工具elasticsearch、fluentd、kibana。我们创建一个kube-logging名称空间将EFK组件安装到该名称空间中。 [rootxianchaomaster1]# kubectl apply -f kube-logging.yaml kube-logging.yaml文件内容如下 kind: Namespace apiVersion: v1 metadata: name: kube-logging 7.2 查看kube-logging名称空间是否创建成功 [rootxianchaomaster1 efk]# kubectl get namespaces | grep kube-logging #显示如下说明创建成功 kube-logging      Active   86s 7.3 安装elasticsearch组件 通过上面步骤已经创建了一个名称空间kube-logging在这个名称空间下去安装日志收集组件efk首先我们需要部署一个有3个节点的Elasticsearch集群。我们使用3个Elasticsearch Pods可以避免高可用中的多节点群集中发生的“脑裂”的问题。 Elasticsearch脑裂可参考如下https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html#split-brain 7.3.1 创建headless service服务 [rootxianchaomaster1]# kubectl apply -f elasticsearch_svc.yaml elasticsearch_svc.yaml文件内容如下 kind: Service apiVersion: v1 metadata: name: elasticsearch namespace: kube-logging labels: app: elasticsearch spec: selector: app: elasticsearch clusterIP: None ports: - port: 9200 name: rest - port: 9300 name: inter-node 查看elasticsearch的service是否创建成功 [rootxianchaomaster1]# kubectl get services --namespacekube-logging 看到如下说明在kube-logging名称空间下创建了一个名字是elasticsearch的headless service NAME            TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE elasticsearch   ClusterIP   None         none        9200/TCP,9300/TCP   2m 现在我们已经为 Pod 设置了无头服务和一个稳定的域名.elasticsearch.kube-logging.svc.cluster.local接下来我们通过 StatefulSet来创建具体的 Elasticsearch的Pod 应用。 7.3.2 通过statefulset创建elasticsearch集群 7.3.2.1 创建Storageclass实现存储类动态供给 #安装nfs服务选择k8s集群的xianchaomaster1节点k8s集群的xianchaomaster1节点的ip是192.168.40.180 #yum安装nfs [rootxianchaomaster1 ~]# yum install nfs-utils -y [rootxianchaonode1 ~]# yum install nfs-utils -y #启动nfs服务 [rootxianchaomaster1 ~]# systemctl start nfs [rootxianchaonode1 ~]# systemctl start nfs #设置nfs开机自启动 [rootxianchaomaster1 ~]# systemctl enable nfs.service [rootxianchaonode1 ~]# systemctl enable nfs.service #在xianchaomaster1上创建一个nfs共享目录 [rootxianchaomaster1 ~]# mkdir /data/v1 -p #编辑/etc/exports文件 [rootxianchaomaster1 ~]# vim /etc/exports /data/v1 *(rw,no_root_squash) #加载配置使配置生效 [rootxianchaomaster1 ~]# exportfs -arv [rootxianchaomaster1 ~]# systemctl restart nfs #创建nfs作为存储的供应商 1、创建运行nfs-provisioner需要的sa账号 [rootxianchaomaster1 nfs]# cat serviceaccount.yaml apiVersion: v1 kind: ServiceAccount metadata: name: nfs-provisioner [rootxianchaomaster1 nfs]# kubectl apply -f serviceaccount.yaml serviceaccount/nfs-provisioner created 扩展什么是sa sa的全称是serviceaccount。 serviceaccount是为了方便Pod里面的进程调用Kubernetes API或其他外部服务而设计的。 指定了serviceaccount之后我们把pod创建出来了我们在使用这个pod时这个pod就有了我们指定的账户的权限了。 2、对sa授权  [rootxianchaomaster1]# kubectl create clusterrolebinding nfs-provisioner-clusterrolebinding --clusterrolecluster-admin --serviceaccountdefault:nfs-provisioner #把nfs-subdir-external-provisioner.tar.gz上传到xianchaonode1上手动解压。 [rootxianchaonode1 ~]# docker load -i nfs-subdir-external-provisioner.tar.gz #通过deployment创建pod用来运行nfs-provisioner [rootxianchaomaster1]# kubectl apply -f deployment.yaml deployment.yaml文件解释说明 kind: Deployment apiVersion: apps/v1 metadata: name: nfs-provisioner spec: selector: matchLabels: app: nfs-provisioner replicas: 1 strategy: type: Recreate template: metadata: labels: app: nfs-provisioner spec: serviceAccount: nfs-provisioner containers: - name: nfs-provisioner image: registry.cn-beijing.aliyuncs.com/mydlq/nfs-subdir-external-provisioner:v4.0.0 imagePullPolicy: IfNotPresent volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME value: example.com/nfs - name: NFS_SERVER value: 192.168.40.180 #这个需要写nfs服务端所在的ip地址大家需要写自己安装了nfs服务的机器ip - name: NFS_PATH value: /data/v1 #这个是nfs服务端共享的目录 volumes: - name: nfs-client-root nfs: server: 192.168.40.180 path: /data/v1 #验证nfs是否创建成功 [rootxianchaomaster1]# kubectl get pods | grep nfs #显示如下说明创建成功 nfs-provisioner-5975849bb4-92dhq   1/1     Running     3          11h 7.3.2.2 安装elasticsearch集群 #创建stoorageclass [rootxianchaomaster1]# kubectl apply -f class.yaml class.yaml文件内容如下 apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: do-block-storage provisioner: example.com/nfs 注 provisioner:  example.com/nfs    #该值需要和nfs provisioner配置的PROVISIONER_NAME处的value值保持一致 #把elasticsearch_7_2_0.tar.gz和busybox.tar.gz 上传到xianchaonode1手动解压 [rootxianchaonode1 ~]# docker load -i elasticsearch_7_2_0.tar.gz [rootxianchaonode1 ~]# docker load -i busybox.tar.gz #更新资源清单文件 [rootxianchaomaster1]# kubectl apply -f elasticsearch-statefulset.yaml elasticsearch-statefulset.yaml文件解释说明 apiVersion: apps/v1 kind: StatefulSet metadata: name: es-cluster namespace: kube-logging spec: serviceName: elasticsearch replicas: 3 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch 上面内容的解释在kube-logging的名称空间中定义了一个es-cluster的StatefulSet。然后我们使用serviceName 字段与我们之前创建的headless ElasticSearch服务相关联。这样可以确保可以使用以下DNS地址访问StatefulSet中的每个Podes-cluster-[0,1,2].elasticsearch.kube-logging.svc.cluster.local其中[0,1,2]与Pod分配的序号数相对应。我们指定3个replicas3个Pod副本将selector matchLabels 设置为app: elasticseach。该.spec.selector.matchLabels和.spec.template.metadata.labels字段必须匹配。 2statefulset中定义pod模板内容如下 . . . spec: containers: - name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0 imagePullPolicy: IfNotPresent resources: limits: cpu: 1000m requests: cpu: 100m ports: - containerPort: 9200 name: rest protocol: TCP - containerPort: 9300 name: inter-node protocol: TCP volumeMounts: - name: data mountPath: /usr/share/elasticsearch/data env: - name: cluster.name value: k8s-logs - name: node.name valueFrom: fieldRef: fieldPath: metadata.name - name: discovery.seed_hosts value: es-cluster-0.elasticsearch,es-cluster-1.elasticsearch,es-cluster-2.elasticsearch - name: cluster.initial_master_nodes value: es-cluster-0,es-cluster-1,es-cluster-2 - name: ES_JAVA_OPTS value: -Xms512m -Xmx512m 上面内容解释在statefulset中定义了pod容器的名字是elasticsearch镜像是docker.elastic.co/elasticsearch/elasticsearch:7.2.0。使用resources字段来指定容器至少需要0.1个vCPU并且容器最多可以使用1个vC​​PU了解有关资源请求和限制可参考Resource Management for Pods and Containers | Kubernetes。 容器暴露了9200和9300两个端口名称要和上面定义的 Service 保持一致通过volumeMount声明了数据持久化目录定义了一个data数据卷通过volumeMount把它挂载到容器里的/usr/share/elasticsearch/data目录。 容器中设置了一些环境变量 cluster.nameElasticsearch 集群的名称我们这里是 k8s-logs。 node.name节点的名称通过metadata.name来获取。这将解析为 es-cluster-[0,1,2]取决于节点的指定顺序。 discovery.seed_hosts此字段用于设置在Elasticsearch集群中节点相互连接的发现方法它为我们的集群指定了一个静态主机列表。由于我们之前配置的是无头服务我们的 Pod 具有唯一的 DNS 地址es-cluster-[0,1,2].elasticsearch.kube-logging.svc.cluster.local因此我们相应地设置此地址变量即可。由于都在同一个 namespace 下面所以我们可以将其缩短为es-cluster-[0,1,2].elasticsearch。 要了解有关 Elasticsearch 发现的更多信息请参阅 Elasticsearch 官方文档Discovery and cluster formation | Elasticsearch Guide [8.15] | Elastic。 。 ES_JAVA_OPTS这里我们设置为-Xms512m -Xmx512m告诉JVM使用512 MB的最小和最大堆。这个值应该根据群集的资源可用性和需求调整这些参数。 要了解更多信息请参阅设置堆大小的相关文档Heap size settings | Elasticsearch Guide [8.15] | Elastic。 3initcontainer内容 . . . initContainers: - name: fix-permissions image: busybox command: [sh, -c, chown -R 1000:1000 /usr/share/elasticsearch/data] securityContext: privileged: true volumeMounts: - name: data mountPath: /usr/share/elasticsearch/data - name: increase-vm-max-map image: busybox command: [sysctl, -w, vm.max_map_count262144] securityContext: privileged: true - name: increase-fd-ulimit image: busybox command: [sh, -c, ulimit -n 65536] securityContext: privileged: true 这里我们定义了几个在主应用程序之前运行的 Init 容器这些初始容器按照定义的顺序依次执行执行完成后才会启动主应用容器。 第一个名为 fix-permissions 的容器用来运行 chown 命令将 Elasticsearch 数据目录的用户和组更改为1000:1000Elasticsearch 用户的 UID。因为默认情况下Kubernetes 用 root 用户挂载数据目录这会使得 Elasticsearch 无法访问该数据目录可以参考 Elasticsearch 生产中的一些默认注意事项相关文档说明https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#_notes_for_production_use_and_defaults。 第二个名为 increase-vm-max-map 的容器用来增加操作系统对mmap计数的限制默认情况下该值可能太低导致内存不足的错误要了解更多关于该设置的信息可以查看 Elasticsearch 官方文档说明Virtual memory | Elasticsearch Guide [8.15] | Elastic。 最后一个初始化容器是用来执行ulimit命令增加打开文件描述符的最大数量的。 此外 Elastisearch Notes for Production Use 文档还提到了由于性能原因最好禁用 swap对于 Kubernetes 集群而言最好也是禁用 swap 分区的。 现在我们已经定义了主应用容器和它之前运行的 Init Containers 来调整一些必要的系统参数接下来可以添加数据目录的持久化相关的配置。 4在 StatefulSet 中使用volumeClaimTemplates来定义volume 模板即可 . . . volumeClaimTemplates: - metadata: name: data labels: app: elasticsearch spec: accessModes: [ ReadWriteOnce ] storageClassName: do-block-storage resources: requests: storage: 10Gi 我们这里使用 volumeClaimTemplates 来定义持久化模板Kubernetes 会使用它为 Pod 创建 PersistentVolume设置访问模式为ReadWriteOnce这意味着它只能被 mount 到单个节点上进行读写然后最重要的是使用了一个名为do-block-storage的 StorageClass 对象所以我们需要提前创建该对象我们这里使用的 NFS 作为存储后端所以需要安装一个对应的nfs  provisioner 驱动。 #查看es的pod是否创建成功 [rootxianchaomaster1]# kubectl get pods -n kube-logging 显示如下说明es集群创建成功了 NAME           READY   STATUS    RESTARTS   AGE es-cluster-0   1/1     Running   0          55s es-cluster-1   1/1     Running   0          35s es-cluster-2   1/1     Running   0          28s [rootxianchaomaster1]# kubectl get svc -n kube-logging NAME            TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)              elasticsearch   ClusterIP   None         none        9200/TCP,9300/TCP 8、安装kibana可视化UI界面 #把kibana_7_2_0.tar.gz上传到xianchaonode1节点手动解压 [rootxianchaonode1 ~]# docker load -i kibana_7_2_0.tar.gz #更新资源清单文件 [rootxianchaomaster]# kubectl apply -f kibana.yaml kibana.yaml资源清单文件解读 apiVersion: v1 kind: Service metadata: name: kibana namespace: kube-logging labels: app: kibana spec: type: NodePort ports: - port: 5601 selector: app: kibana --- apiVersion: apps/v1 kind: Deployment metadata: name: kibana namespace: kube-logging labels: app: kibana spec: replicas: 1 selector: matchLabels: app: kibana template: metadata: labels: app: kibana spec: containers: - name: kibana image: docker.elastic.co/kibana/kibana:7.2.0 imagePullPolicy: IfNotPresent resources: limits: cpu: 1000m requests: cpu: 100m env: - name: ELASTICSEARCH_URL value: http://elasticsearch.kube-logging.svc.cluster.local:9200 ports: - containerPort: 5601 上面我们定义了两个资源对象一个 Service 和 Deployment为了测试方便我们将 Service 设置为了 NodePort 类型Kibana Pod 中配置都比较简单唯一需要注意的是我们使用 ELASTICSEARCH_URL 这个环境变量来设置Elasticsearch 集群的端点和端口直接使用 Kubernetes DNS 即可此端点对应服务名称为 elasticsearch由于是一个 headless service所以该域将解析为3个 Elasticsearch Pod 的 IP 地址列表。 [rootxianchaomaster1]#kubectl get pods -n kube-logging 显示如下看到kibana开头的pod说明kibana也已经部署成功了 NAME                      READY   STATUS    RESTARTS   AGE es-cluster-0              1/1     Running   0          170m es-cluster-1              1/1     Running   0          170m es-cluster-2              1/1     Running   0          170m kibana-5749b5778b-c9djr   1/1     Running   0          4m3s [rootxianchaomaster1 efk]# kubectl get svc -n kube-logging NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)              elasticsearch   ClusterIP   None             none        9200/TCP,9300/TCP    kibana          NodePort    10.106.209.160   none        5601:31966/TCP       在浏览器中打开http://任意节点IP:31966即可如果看到如下欢迎界面证明 Kibana 已经成功部署到了Kubernetes集群之中。 9、安装fluentd组件 我们使用daemonset控制器部署fluentd组件这样可以保证集群中的每个节点都可以运行同样fluentd的pod副本这样就可以收集k8s集群中每个节点的日志在k8s集群中容器应用程序的输入输出日志会重定向到node节点里的json文件中fluentd可以tail和过滤以及把日志转换成指定的格式发送到elasticsearch集群中。除了容器日志fluentd也可以采集kubelet、kube-proxy、docker的日志。 # 离线镜像压缩包fluentd.tar.gz上传到xianchaonode1和xianchaomaster1上手动解压 [rootxianchaonode1 ~]# docker load -i fluentd.tar.gz [rootxianchaomaster1 ~]# docker load -i fluentd.tar.gz #更新资源清单文件 [rootxianchaomaster]# kubectl apply -f fluentd.yaml #查看fluentd是否部署成功 [rootxianchaomaster1]# kubectl get pods -n kube-logging 显示如下看到fluentd的pod的status状态是running说明部署成功 NAME                 READY   STATUS    RESTARTS   es-cluster-0             1/1      Running   0           es-cluster-1             1/1      Running   0           es-cluster-2             1/1      Running   0          fluentd-dszg7           1/1      Running   0          fluentd-dwk7t           1/1      Running   0        kibana-84cf7f59c-2546x  1/1     Running   0        Fluentd 启动成功后我们可以前往 Kibana 的 Dashboard 页面中 点击Try our sample data 点击左侧的Discover 可以看到如下配置页面 在这里可以配置我们需要的 Elasticsearch 索引前面 Fluentd 配置文件中我们采集的日志使用的是 logstash 格式这里只需要在文本框中输入logstash-*即可匹配到 Elasticsearch 集群中的所有日志数据 点击Next step 选择timestamp创建索引 点击左侧的discover可看到如下 10、测试收集pod容器日志 #把busybox.tar.gz上传到xianchaonode1节点手动解压: [rootxianchaonode1 ~]# docker load -i busybox.tar.gz [rootxianchaomaster1]# kubectl apply -f pod.yaml Kibana查询语言KQL官方地址 https://www.elastic.co/guide/en/kibana/7.2/kuery-query.html 登录到kibana的控制面板在discover处的搜索栏中输入kubernetes.pod_name:counter这将过滤名为的Pod的日志数据counter如下所示 通过上面几个步骤我们已经在k8s集群成功部署了elasticsearchfluentdkibana这里使用的efk系统包括3个Elasticsearch Pod一个Kibana Pod和一组作为DaemonSet部署的Fluentd Pod。 11、基于EFKlogstashkafka构建高吞吐量的日志收集平台 fluentd--kafka--logstash--elasticsearch--kibana 1、部署fluentd https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/fluentd-elasticsearch/fluentd-es-ds.yaml [rootxianchaomaster1 ~]# cat fluentd-configmap.yaml kind: ConfigMap apiVersion: v1 metadata: name: fluentd-config namespace: logging labels: addonmanager.kubernetes.io/mode: Reconcile data: system.conf: |- system root_dir /tmp/fluentd-buffers/ /system containers.input.conf: |- source id fluentd-containers.log type tail path /var/log/containers/*.log pos_file /var/log/es-containers.log.pos time_format %Y-%m-%dT%H:%M:%S.%NZ localtime tag raw.kubernetes.* format json read_from_head true /source # Detect exceptions in the log output and forward them as one log entry. match raw.kubernetes.** id raw.kubernetes type detect_exceptions remove_tag_prefix raw message log stream stream multiline_flush_interval 5 max_bytes 500000 max_lines 1000 /match system.input.conf: |- # Logs from systemd-journal for interesting services. source id journald-docker type systemd filters [{ _SYSTEMD_UNIT: docker.service }] storage type local persistent true /storage read_from_head true tag docker /source source id journald-kubelet type systemd filters [{ _SYSTEMD_UNIT: kubelet.service }] storage type local persistent true /storage read_from_head true tag kubelet /source forward.input.conf: |- # Takes the messages sent over TCP source type forward /source output.conf: |- # Enriches records with Kubernetes metadata filter kubernetes.** type kubernetes_metadata /filter match ** id elasticsearch type elasticsearch log_level info include_tag_key true host es主机ip port 9200 logstash_format true request_timeout    30s buffer type file path /var/log/fluentd-buffers/kubernetes.system.buffer flush_mode interval retry_type exponential_backoff flush_thread_count 2 flush_interval 5s retry_forever retry_max_interval 30 chunk_limit_size 2M queue_limit_length 8 overflow_action block /buffer /match [rootxianchaomaster1 ~]# cat fluentd-daemonset.yaml apiVersion: v1 kind: ServiceAccount metadata: name: fluentd-es namespace: logging labels: k8s-app: fluentd-es kubernetes.io/cluster-service: true addonmanager.kubernetes.io/mode: Reconcile --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: true addonmanager.kubernetes.io/mode: Reconcile rules: - apiGroups: - resources: - namespaces - pods verbs: - get - watch - list --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: true addonmanager.kubernetes.io/mode: Reconcile subjects: - kind: ServiceAccount name: fluentd-es namespace: logging apiGroup: roleRef: kind: ClusterRole name: fluentd-es apiGroup: --- apiVersion: apps/v1 kind: DaemonSet metadata: name: fluentd-es namespace: logging labels: k8s-app: fluentd-es version: v2.0.4 kubernetes.io/cluster-service: true addonmanager.kubernetes.io/mode: Reconcile spec: selector: matchLabels: k8s-app: fluentd-es version: v2.0.4 template: metadata: labels: k8s-app: fluentd-es kubernetes.io/cluster-service: true version: v2.0.4 # This annotation ensures that fluentd does not get evicted if the node # supports critical pod annotation based priority scheme. # Note that this does not guarantee admission on the nodes (#40573). annotations: scheduler.alpha.kubernetes.io/critical-pod: spec: serviceAccountName: fluentd-es containers: - name: fluentd-es image: cnych/fluentd-elasticsearch:v2.0.4 env: - name: FLUENTD_ARGS value: --no-supervisor -q resources: limits: memory: 500Mi requests: cpu: 100m memory: 200Mi volumeMounts: - name: varlog mountPath: /var/log - name: varlibdockercontainers mountPath: /var/lib/docker/containers readOnly: true - name: config-volume mountPath: /etc/fluent/config.d nodeSelector: beta.kubernetes.io/fluentd-ds-ready: true tolerations: - key: node-role.kubernetes.io/master operator: Exists effect: NoSchedule terminationGracePeriodSeconds: 30 volumes: - name: varlog hostPath: path: /var/log - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers - name: config-volume configMap: name: fluentd-config 创建节点标签 [rootxianchaomaster1 ~]# kubectl label nodes xianchaomaster1 beta.kubernetes.io/fluentd-ds-readytrue [rootxianchaomaster1 ~]# kubectl label nodes xianchaonode1 beta.kubernetes.io/fluentd-ds-readytrue [rootxianchaomaster1 ~]# kubectl apply  -f fluentd-daemonset.yaml  2、接入kafka cat kafka-config.yaml kind: ConfigMap apiVersion: v1 metadata: name: fluentd-config namespace: logging labels: addonmanager.kubernetes.io/mode: Reconcile data: system.conf: |- system root_dir /tmp/fluentd-buffers/ /system containers.input.conf: |- source id fluentd-containers.log type tail path /var/log/containers/*.log pos_file /var/log/es-containers.log.pos time_format %Y-%m-%dT%H:%M:%S.%NZ localtime tag raw.kubernetes.* format json read_from_head true /source # Detect exceptions in the log output and forward them as one log entry. match raw.kubernetes.** id raw.kubernetes type detect_exceptions remove_tag_prefix raw message log stream stream multiline_flush_interval 5 max_bytes 500000 max_lines 1000 /match system.input.conf: |- # Logs from systemd-journal for interesting services. source id journald-docker type systemd filters [{ _SYSTEMD_UNIT: docker.service }] storage type local persistent true /storage read_from_head true tag docker /source source id journald-kubelet type systemd filters [{ _SYSTEMD_UNIT: kubelet.service }] storage type local persistent true /storage read_from_head true tag kubelet /source forward.input.conf: |- # Takes the messages sent over TCP source type forward /source output.conf: |- # Enriches records with Kubernetes metadata filter kubernetes.** type kubernetes_metadata /filter match ** id kafka type kafka2 log_level info include_tag_key true # list of seed brokers brokers kafka ip:9092 use_event_time true # buffer settings buffer type file path /var/log/fluentd-buffers/kubernetes.system.buffer flush_mode interval retry_type exponential_backoff flush_thread_count 2 flush_interval 5s retry_forever retry_max_interval 30 chunk_limit_size 2M queue_limit_length 8 overflow_action block /buffer # data type settings format type json /format # topic settings topic_key topic default_topic messages # producer settings required_acks -1 compression_codec gzip /match 重启fluentd  3、配置logstash 配置logstash消费messages日志写入elasticsearch cat config/kafkaInput_fluentd.conf input { kafka { bootstrap_servers [kafka ip:9092] client_id fluentd group_id fluentd consumer_threads 1 auto_offset_reset latest topics [messages] } } filter { json{ source message } ruby { code event.set(timestamp, event.get(timestamp).time.localtime 8*60*60) } ruby { code event.set(timestamp,event.get(timestamp)) } ruby { code event.set(find_time,event.get(timestamp).time.localtime - 8*60*60) } mutate { remove_field [timestamp] remove_field [message] } } output { elasticsearch{ hosts [es ip地址: 9200] index kubernetes_%{YYYY_MM_dd} } #    stdout { #           codec rubydebug #           } } 4、启动logstash nohup ./bin/logstash -f config/kafkaInput_fluentd.conf --config.reload.automatic --path.data/opt/logstash/data_fluentd 21 fluentd.log
http://www.yingshimen.cn/news/6881/

相关文章:

  • 南宁网站建设公司哪家专业工作室网站制作
  • 成品网站源码手机分销网站公司
  • 制作服务网站广州旅游景点
  • 清远住房和城乡建设局网站搜狗推广和百度推广哪个好
  • 深圳外贸电商网站建设wordpress网站重新安装
  • 郑州网站建设代运营福建省建设厅网站林瑞良
  • 360网站建设价格网站建设报价单 下载
  • 黑龙江省建设工程质量安全协会网站百度数据研究中心官网
  • 北京那个网站建设公司比较好微信商城怎么开店
  • 做网站设计哪家好专门做网站的公司与外包公司有哪些
  • 大连开发区网站开发公司电话简约大气的ppt模板
  • 获奖设计网站成都工业设计公司排名
  • 建设公司网站需要准备哪些材料扶贫网站建设
  • 网站建设优化服务机构陕西省建设工程质量监督局网站
  • 医疗设备公司的网站怎么做报班学平面设计
  • 南京移动网站建设哪里强网件路由器登陆网址
  • 优秀网站的颜色搭配有哪些可以做问卷的网站
  • 潍坊 网站成品视频直播软件推荐哪个好一点ios
  • 漳州城乡住房建设部网站简约创意情人节海报设计
  • 重庆免费发布信息网站南县做网站
  • 营口规划建设局网站怎么做网站赚
  • 如何创建百度网站上海住房与建设部网站
  • 南京建设局的网站首页免费建站网站建设
  • 网站开发周期是什么意思海尔集团电商网站建设
  • 网站建设公司果动c网站开发好做吗
  • 北湖区网站建设公司哪家好wordpress文章展示相册
  • 移动网站开发教学大纲代理公司是干什么的
  • 网站建设实训心得体会2023房价即将暴涨十大城市
  • 电影网站建设报价给用ps做的网站加div
  • 小程序网站开发单页网站模板