config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
initEnv(env);
Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
//从kafka中获取轨迹数据
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
//从checkpoint最新处消费
flinkKafkaConsumer010.setStartFromLatest();
DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
.addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");
任务容器数为24个container,一共48个并发。savepoint为15分钟
streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");
取消sink2ES的操作后,QPS达到110k,是之前QPS的十倍。由此可以基本判定: ES集群写性能导致的上游反压
索引字段类型调整
降低副本数
curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'
针对这种ToB、日志型、实时性要求不高的场景,我们不需要查询的实时性,通过加大甚至关闭refresh_interval的参数提高写入性能。
curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'
在flink执行时,通过Grafana观测各个节点CPU 使用率以及通过linux命令查看各个节点CPU核数。发现CPU使用率高的节点CPU核数比其余节点少。为了排除这个短板效应,我们将在这个节点中的索引shard移动到CPU核数多的节点。
curl -XPOST {集群地址}/_cluster/reroute -d'{"commands":[{"move":{"index":"{索引名称}","shard":5,"from_node":"源node名称","to_node":"目标node名称"}}]}' -H "Content-Type:application/json"
以上优化的效果:
经过以上的优化,我们发现写入性能提升有限
。因此,需要深入查看写入的瓶颈点
"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.LinkedTransferQueue@369223fa
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
从上面的线程堆栈我们可以看出线程处于等待状态。
关于这个问题的讨论详情查看,这个issue讨论大致意思是:节点数不够,需要增加节点。于是我们又增加节点并通过设置索引级别的total_shards_per_node参数将索引shard的写入平均到各个节点上)
经过以上的优化,我们发现在kafka中的topic积压有明显变少的趋势:
参照官网:
indices.memory.index_buffer_size : 10%
索引写入ES的基本流程是:1.数据写入buffer缓冲和translog 2.每秒buffer的数据生成segment并进入内存,此时segment被打开并供search使用查询 3.buffer清空并重复上述步骤 4.buffer不断
添加、清空
translog不断累加,当达到某些条件触发commit操作,刷到磁盘。es默认的刷盘操作为request但容易部分操作比较耗时,在日志型集群、允许数据在刷盘过程中少量丢失可以改成异步async
另外一次commit操作是在translog达到某个阈值执行的,可以把大小(flush_threshold_size )调大,刷新间隔调大。
index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s
效果:
当ES写入性能遇到瓶颈时,我总结的思路应该是这样:
因篇幅问题不能全部显示,请点此查看更多更全内容