博主信息
博文 15
粉丝 0
评论 0
访问量 12471
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
MongoDB-Elasticsearch 实时数据导入
Tina
原创
1102人浏览过

搜索功能是App必不可少的一部分,我们使用目前比较流行的Elasticsearch进行全文检索。我们的数据主要存储在MongoDB中,如何将这些数据导入到Elasticsearch中,并能一直保持同步呢?做法大致分为两种:在应用层操作,在读写MongoDB的同时读写Elasticsearch,比如mongoosastic,需要修改已有的业务代码。与业务无关,通过读取MongoDB的replica oplog,将MongoDB产生的操作在Elasticsearch上replay,来实现单向同步。

为了减少老代码修改成本,我们选择了第二种方案,使用mongo-connector来进行数据同步。然而用着用着我们发现mongo-connector有一些问题:

有些数据需要关联查询,但是mongo-connector并不支持parent-child模型(其实有一个fork是支持的,但已经落后主分支一个版本,并且合进主分支的希望渺茫)。
mongo-connector支持断点续传,但是恢复速度非常缓慢。mongo-connector可以设置每次处理的文档数量,但坑爹的地方在于,到不了设置的数字,它始终不会写入。比如,MongoDB一个表只有100个文档,但是设置了batch的size为1000,于是那100个文档这辈子也同步不到Elasticsearch中了。mongo-connector不会限速,直接把Elasticsearch写炸了,但它不会管,接着写,而且中间丢掉的数据就算后面有oplog里面有update操作,也没办法恢复,会报出404错误。在MongoDB里面存了一张meta表,在Elasticsearch里面也存了一个meta索引,里面存了大量的timestamp,直接使Elasticsearch文档总数翻倍。(以上mongo-connector的缺点,如有诽谤,或许是我们不会用,恳请斧正。)


于是我们开始寻找更好用的工具,却发现没有好用的工具:

Elasticsearch Rivers,曾经的官方同步工具。但该项目早已废弃。Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,并且项目进度缓慢。
elasticsearch-hadoop,先导到hadoop,再导到Elasticsearch。高射炮打蚊子,绕一大圈,不经济。

没办法,只好自己用TypeScript写一个,取名为mongo-es。



mongo-es导入数据分为两个阶段:

Scan:扫描整个MongoDB的collection,每条文档都插入到Elasticsearch对应的index里面。使用Bulk API,进行批量写入。在扫描开始前记录当前的时间点,供第二阶段使用。
Tail:从刚才记录的时间点,或一个指定的卖二手手机号码时间点开始,将MongoDB的oplog在Elasticsearch上进行replay。使用RxJS的bufferWithTimeOrCount函数,既能批量写入,又能保证同步延迟不会很长(一般是一秒左右)。

mongo-es比mongo-connector进步的地方有:

支持parent-child relationship,可以处理需要join的数据。
可以逆序Scan,先导入最新的数据,这对于出错后重建索引快速恢复非常有用。无需在两边存储多余元数据,只记录oplog的timestamp。只要程序挂的时间不太长,oplog里面还有这个timestamp,就能恢复。遇到缺失文档自动恢复。当因为不可控因素(如网络原因),导致某个本应已经同步了的文档在Elasticsearch中不存在。这时如果oplog里面遇到一个对该文档的update操作,mongo-connector无法处理,打印出404错误。遇到这种情况时,mongo-es会回到MongoDB中,读取到这个文档,进行更新。有限速功能,能够限制每秒钟读取的文档数量,避免把Elasticsearch压垮。

当然了,mongo-connector是一个更加通用的程序,可以把文档导到更多的地方。mongo-es只是把MongoDB的数据导入到Elasticsearch中,这样比较未免有些不公平,但就在MongoDB到Elasticsearch这个使用场景下,还是好用不少的。



开发过程中踩过的坑:

Scan阶段使用stream,方便控制读取速度。Tail阶段使用cursor,配合noCursorTimeout参数,避免长时间没有oplog时的超时错误。Tail阶段如果用stream,即使是设置了noCursorTimeout,超时了也会报错。对于操作是update的oplog,oplog里面有可能是一个完整的文档,这时候直接就可以写入。也有可能是$set或$unset操作,这时候要去Elasticsearch里面取到旧的,完整的文档,在内存里执行update后再写入回去。最好不要直接读MongoDB,以减少MongoDB负担。在内存中执行update时,也要检查变化的字段是否属于我们需要的字段。如果变化的都不是需要的字段,可以忽略这次update操作,如果变化的字段不在我们需要的范围内,则应排除,以减少写入次数。有_parent的文档是不能直接用_id访问到的,因为它的routing是_parent,必须指定_parent的值才行。对于操作是update的oplog,我们只能拿到_id,拿不到_parent对应的字段,所以这时要用es.search代替es.get,访问每个分片,才能拿到文档。Timestamp在js代码里表示时low在前,high在后。在mongo shell里面是反过来的。Bulk API传入的body长度不能为0,遇到0的情况要跳过,否则会报错。

现已开源至github,并发布到了npm,欢迎大家多多试用,多挑(ti)毛(xu)病(qiu)。


本博文版权归博主所有,转载请注明地址!如有侵权、违法,请联系admin@php.cn举报处理!
全部评论 文明上网理性发言,请遵守新闻评论服务协议
0条评论
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号

  • 登录PHP中文网,和优秀的人一起学习!
    全站2000+教程免费学