背景 有这样一个业务场景, 玩家可以在游戏里面建立各种房间, 房间里面可以运行各种各样的游戏. 其中, 房间的数据就是用MongoDB记录, 每一条文档对应的是一个房间. 为了观察房间状态的变化以及统计玩家的用户行为, 需要对MongoDB的数据进行收集和统计.
本来打算简单用个Grafana设置MongoDB做数据源, 再弄个dashboard就完事了, 奈何Grafana需要商用证书才可以使用MongoDB的插件, 最后就计划用ES.
搜了一圈开源社区, 也没有找到可以订阅MongoDB再输出到ES的中间件, 最后只好自己写了.
环境信息:
可行性验证 需求很简单, 就是要定期查询MongoDB中一部分特定文档, 插入到ES的特定索引中.
计划使用Python, 并通过交互式运行. 需要完成对MongoDB的查询以及到ES的入库过程.
MongoDB客户端 使用pymongo 4.3.3
版本, 关于连接MongoDB方法, 查看文档如下:
在本例中, 连接MongoDB的代码如下:
1 2 3 4 5 6 7 >>> import pymongo>>> dburl="mongodb://mongouser:xxxxxx@10.3.2.26:27017,10.3.2.4:27017,10.3.2.24:27017/?replicaSet=aaa111" >>> myclient=pymongo.MongoClient(dburl)>>> myclientMongoClient(host=['10.3.2.26:27017' , '10.3.2.24:27017' , '10.3.2.4:27017' ], document_class=dict , tz_aware=False , connect=True , replicaset='aaa111' ) >>> type (myclient)<class 'pymongo .mongo_client .MongoClient '>
初始化了一个实例myclient
, 用于连接MongoDB, 后面我们的所有MongoDB相关操作, 都是通过这个实例进行.
MongoClient
这个类, 在site-packages/pymongo/mongo_client.py
中定义.
也可以在python的交互式界面通过help()
或者dir()
功能去查看文档, 但笔者一般习惯直接看线上文档 .
数据查询 在MongoClient
类中有__getitem__
方法, 可以返回所有的Database:
1 2 3 4 5 6 7 8 9 10 11 12 13 class MongoClient (common.BaseObject, Generic [_DocumentType] ): ...省略... def __getitem__ (self, name: str ) -> database.Database[_DocumentType]: """Get a database by name. Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used. :Parameters: - `name`: the name of the database to get """ return database.Database(self, name) ...省略...
接着, 在文件database.py
中约定的Database
类中, 也有__getitem__
方法, 可以返回所有的Collection:
1 2 3 4 5 6 7 8 9 10 11 12 13 class Database (common.BaseObject, Generic [_DocumentType] ): """A Mongo database.""" ...省略... def __getitem__ (self, name: str ) -> "Collection[_DocumentType]": """Get a collection of this database by name. Raises InvalidName if an invalid collection name is used. :Parameters: - `name`: the name of the collection to get """ return Collection(self, name) ...省略...
又可以继续深挖, 在collection.py
定义的Collection
中, 有find方法, 可以开始查询:
1 2 3 4 5 class Collection (common.BaseObject, Generic [_DocumentType] ): """A Mongo collection.""" ...省略... def find (self, *args: Any , **kwargs: Any ) -> Cursor[_DocumentType]: return Cursor(self, *args, **kwargs)
最后, 一通摸索下来, 要搜索ZoneService
库里面的zones
集合, 我们就可以这样进行查询:
1 2 3 4 >>> result = myclient['ZoneService' ]['zones' ].find({})>>> result[0 ]{'_id' : ObjectId('649c11644226789ff4f682ae' ), 'groupId' : '200' , 'projectType' : 4 , 'version' : 'this is a secret' , 'role' : '' , 'platformBitTypesForNavigation' : 0 , 'platformBitTypesForMatching' : 7 , 'serverGuid' : '01002405-76BD-4BFC-BCED-8DE4C8541AFA' , 'zoneGuid' : 'C6385685-74E9-4673-A696-F0EDF7040296' , 'zoneExtendType' : 4 , 'zoneSearchKey' : '3901862586:1' , 'zoneSearchUniqueKey' : 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467' , 'zoneId' : 0 , 'ugcId' : '3901862586' , 'ugcVersionId' : '1' , 'ugcTitle' : '测试数据' , 'maxPlayerCount' : 2 , 'reservedCount' : 0 , 'connectionCount' : 0 , 'activeCount' : 0 , 'channelNo' : 0 , 'enterLimit' : False , 'enterOwner' : False , 'ownerMemberNo' : 0 , 'reservedUsers' : [], 'enteredUsers' : [], 'isCheckValid' : True , 'createAt' : datetime.datetime(2023 , 6 , 28 , 10 , 54 , 28 , 693000 ), 'lastTryEnterableCheckingAt' : datetime.datetime(2023 , 6 , 27 , 10 , 54 , 28 , 693000 )} >>>
可以看到, 我们可以通过下标直接查看查询结果, 也可以使用for
进行遍历. MongoDB中的文档, 以字典的形式呈现出来.
连接ES 和pymongo
不一样, elasticsearch
库的代码结构又是另一个风格.
首先我们要在elasticsearch/client/__init__.py
里面看到客户端类Elasticsearch
, 在初始化的时候, 会通过引用其他对象, 实现对ES的REST调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class Elasticsearch (object ): def __init__ (self, hosts=None , transport_class=Transport, **kwargs ): """ :arg hosts: list of nodes, or a single node, we should connect to. Node should be a dictionary ({"host": "localhost", "port": 9200}), the entire dictionary will be passed to the :class:`~elasticsearch.Connection` class as kwargs, or a string in the format of ``host[:port]`` which will be translated to a dictionary automatically. If no value is given the :class:`~elasticsearch.Connection` class defaults will be used. :arg transport_class: :class:`~elasticsearch.Transport` subclass to use. :arg kwargs: any additional arguments will be passed on to the :class:`~elasticsearch.Transport` class and, subsequently, to the :class:`~elasticsearch.Connection` instances. """ self.transport = transport_class(_normalize_hosts(hosts), **kwargs) self.async_search = AsyncSearchClient(self) self.autoscaling = AutoscalingClient(self) self.cat = CatClient(self) self.cluster = ClusterClient(self) self.dangling_indices = DanglingIndicesClient(self) self.indices = IndicesClient(self) self.ingest = IngestClient(self) self.nodes = NodesClient(self) self.remote = RemoteClient(self) self.snapshot = SnapshotClient(self) self.tasks = TasksClient(self) self.xpack = XPackClient(self) self.ccr = CcrClient(self) self.data_frame = Data_FrameClient(self) self.deprecation = DeprecationClient(self) self.enrich = EnrichClient(self) self.eql = EqlClient(self) self.graph = GraphClient(self) self.ilm = IlmClient(self) self.indices = IndicesClient(self) self.license = LicenseClient(self) self.migration = MigrationClient(self) self.ml = MlClient(self) self.monitoring = MonitoringClient(self) self.rollup = RollupClient(self) self.searchable_snapshots = SearchableSnapshotsClient(self) self.security = SecurityClient(self) self.slm = SlmClient(self) self.sql = SqlClient(self) self.ssl = SslClient(self)
根据文档, 在python中连接ES, 方法如下:
1 2 3 4 5 >>> from elasticsearch import Elasticsearch >>> es_url="http://elastic:******@10.3.2.37:9200/" >>> es = Elasticsearch(host=es_url)>>> type (es)<class 'elasticsearch .client .Elasticsearch '>
在Elasticsearch
类中, 还定义了info()
方法, 查看连接信息:
1 2 3 4 5 6 7 8 9 def info (self, params=None , headers=None ): """ Returns basic information about the cluster. `<https://www.elastic.co/guide/en/elasticsearch/reference/7.10/index.html>`_ """ return self.transport.perform_request( "GET" , "/" , params=params, headers=headers )
从代码看, 就是请求了ES服务的/
, 正常情况下, 会返回ES的一些集群信息. 测试一下:
1 2 >>> es.info() {'name': '1642044271000021132', 'cluster_name': 'es-di9ad7tb', 'cluster_uuid': 'yZbbvEbAQvCuF9tZDilvMA', 'version': {'number': '7.10.1', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '187b1511f798e4d23625ffa92ccf5c44840e2650', 'build_date': '2021-12-22T12:45:12.223537200Z', 'build_snapshot': False, 'lucene_version': '8.7.0', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}
向ES插入数据 使用Elasticsearch
类中的index
方法, 通过POST的方式, 把文档插入到ES的索引中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def index (self, index, body, doc_type=None , id =None , params=None , headers=None ): for param in (index, body): if param in SKIP_IN_PATH: raise ValueError("Empty value passed for a required argument." ) if doc_type is None : doc_type = "_doc" return self.transport.perform_request( "POST" if id in SKIP_IN_PATH else "PUT" , _make_path(index, doc_type, id ), params=params, headers=headers, body=body, )
把刚才从MongoDB中查询到的数据, 发送到ES的索引看看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 >>> type (result[0 ])<class 'dict '> >>> es .index (index='rondo-temp-test' , body=result[0 ] ) Traceback (most recent call last ): File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py" , line 130 , in dumps return json.dumps( File "/usr/local/python3/lib/python3.9/json/__init__.py" , line 234 , in dumps return cls( File "/usr/local/python3/lib/python3.9/json/encoder.py" , line 199 , in encode chunks = self.iterencode(o, _one_shot=True ) File "/usr/local/python3/lib/python3.9/json/encoder.py" , line 257 , in iterencode return _iterencode(o, 0 ) File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py" , line 116 , in default raise TypeError("Unable to serialize %r (type: %s)" % (data, type (data))) TypeError: Unable to serialize ObjectId('649c11644226789ff4f682ae' ) (type : <class 'bson .objectid .ObjectId '>) During handling of the above exception , another exception occurred :Traceback (most recent call last): File "<stdin>" , line 1 , in <module> File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/client/utils.py" , line 152 , in _wrapped return func(*args, params=params, headers=headers, **kwargs) File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/client/__init__.py" , line 398 , in index return self.transport.perform_request( File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/transport.py" , line 350 , in perform_request method, params, body, ignore, timeout = self._resolve_request_args( File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/transport.py" , line 416 , in _resolve_request_args body = self.serializer.dumps(body) File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py" , line 134 , in dumps raise SerializationError(data, e) elasticsearch.exceptions.SerializationError: ({'_id' : ObjectId('649c11644226789ff4f682ae' ), 'groupId' : '200' , 'projectType' : 4 , 'version' : 'this is a secret' , 'role' : '' , 'platformBitTypesForNavigation' : 0 , 'platformBitTypesForMatching' : 7 , 'serverGuid' : '01002405-76BD-4BFC-BCED-8DE4C8541AFA' , 'zoneGuid' : 'C6385685-74E9-4673-A696-F0EDF7040296' , 'zoneExtendType' : 4 , 'zoneSearchKey' : '3901862586:1' , 'zoneSearchUniqueKey' : 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467' , 'zoneId' : 0 , 'ugcId' : '3901862586' , 'ugcVersionId' : '1' , 'ugcTitle' : '测试数据' , 'maxPlayerCount' : 2 , 'reservedCount' : 0 , 'connectionCount' : 0 , 'activeCount' : 0 , 'channelNo' : 0 , 'enterLimit' : False , 'enterOwner' : False , 'ownerMemberNo' : 0 , 'reservedUsers' : [], 'enteredUsers' : [], 'isCheckValid' : True , 'createAt' : datetime.datetime(2023 , 6 , 28 , 10 , 54 , 28 , 693000 ), 'lastTryEnterableCheckingAt' : datetime.datetime(2023 , 6 , 27 , 10 , 54 , 28 , 693000 )}, TypeError("Unable to serialize ObjectId('649c11644226789ff4f682ae') (type: <class 'bson.objectid.ObjectId'>)" )) >>>
出现报错了, 报错内容是: TypeError("Unable to serialize ObjectId('649c11644226789ff4f682ae')
.
我们可以看到, 在result[0]
中的这条数据, 包含了一个MongoDB的类:
1 2 3 4 >>> result[0]['_id'] ObjectId('649c11644226789ff4f682ae') >>> type(result[0]['_id']) <class 'bson.objectid.ObjectId'>
ES无法处理这种BSON类, 本来是想着删除就完事的, 但是考虑到可以以文本的形式保留下来, 后面在排查业务问题的时候, 也可以通过这个_id
字段定位到MongoDB的原始数据.
这次在pymongo
部分使用find_one
方法, 只查一条数据作为示例, 解决方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 >>> result = myclient['ZoneService' ]['zones' ].find_one({})>>> type (result)<class 'dict '> >>> result {'_id ': ObjectId('649c11644226789ff4f682ae' ), 'groupId' : '200' , 'projectType' : 4 , 'version' : 'this is a secret' , 'role' : '' , 'platformBitTypesForNavigation' : 0 , 'platformBitTypesForMatching' : 7 , 'serverGuid' : '01002405-76BD-4BFC-BCED-8DE4C8541AFA' , 'zoneGuid' : 'C6385685-74E9-4673-A696-F0EDF7040296' , 'zoneExtendType' : 4 , 'zoneSearchKey' : '3901862586:1' , 'zoneSearchUniqueKey' : 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467' , 'zoneId' : 0 , 'ugcId' : '3901862586' , 'ugcVersionId' : '1' , 'ugcTitle' : '测试数据' , 'maxPlayerCount' : 2 , 'reservedCount' : 0 , 'connectionCount' : 0 , 'activeCount' : 0 , 'channelNo' : 0 , 'enterLimit' : False , 'enterOwner' : False , 'ownerMemberNo' : 0 , 'reservedUsers' : [], 'enteredUsers' : [], 'isCheckValid' : True , 'createAt' : datetime.datetime(2023 , 6 , 28 , 10 , 54 , 28 , 693000 ), 'lastTryEnterableCheckingAt' : datetime.datetime(2023 , 6 , 27 , 10 , 54 , 28 , 693000 )}>>> result['mongo_id' ]Traceback (most recent call last): File "<stdin>" , line 1 , in <module> KeyError: 'mongo_id' >>> result['mongo_id' ] = str (result['_id' ])>>> result['mongo_id' ]'649c11644226789ff4f682ae' >>> type (result['mongo_id' ])<class 'str '> >>> del (result['_id' ] ) >>> es .index (index='rondo-temp-test' , body=result ) /root /rondo /zones -monitor /lib /python3 .9/site -packages /elasticsearch /connection /base .py : 190 : ElasticsearchDeprecationWarning: index [rondo-temp-test] matches multiple legacy templates [default@template, scene@template], composable templates will only match a single template warnings.warn(message, category=ElasticsearchDeprecationWarning) {'_index' : 'rondo-temp-test' , '_type' : '_doc' , '_id' : '0E3xZ4kBTkcA723hKye8' , '_version' : 1 , 'result' : 'created' , '_shards' : {'total' : 2 , 'successful' : 2 , 'failed' : 0 }, '_seq_no' : 0 , '_primary_term' : 1 } >>>
成功!
整合 确定了可行性之后, 接下来就得梳理一下业务需求了:
定期查询MongoDB中特定的集合, 把查询结果的所有文档保存到ES
使用Python运行, 在Python中通过无限循环实现数据监控
通过环境变量读取关键配置信息, 数据查询频率可调
通过stdout输出关键信息
考虑跨时区问题
配置参数 结合需求分析, 不难看出, 起码有四个可配置项, 分别设置成环境变量:
在Python代码中, 我们可以通过os
库获取环境变量:
1 2 3 >>> import os >>> os.getenv('HOME') '/root'
mongodb的timezone 眼尖的笔者还发现, 在上面的摸索过程中看到, MongoDB的数据中有部分datetime
数据, 但是没有带时间戳.
当保存到ES之后, 在ES中记录如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 GET /rondo-temp-test/_doc/0E3 xZ4kBTkcA723hKye8 { "_index" : "rondo-temp-test" , "_type" : "_doc" , "_id" : "0E3xZ4kBTkcA723hKye8" , "_version" : 1 , "_seq_no" : 0 , "_primary_term" : 1 , "found" : true , "_source" : { "groupId" : "200" , "projectType" : 4 , "version" : "this is a secret" , "role" : "" , "platformBitTypesForNavigation" : 0 , "platformBitTypesForMatching" : 7 , "serverGuid" : "01002405-76BD-4BFC-BCED-8DE4C8541AFA" , "zoneGuid" : "C6385685-74E9-4673-A696-F0EDF7040296" , "zoneExtendType" : 4 , "zoneSearchKey" : "3901862586:1" , "zoneSearchUniqueKey" : "NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467" , "zoneId" : 0 , "ugcId" : "3901862586" , "ugcVersionId" : "1" , "ugcTitle" : "测试数据" , "maxPlayerCount" : 2 , "reservedCount" : 0 , "connectionCount" : 0 , "activeCount" : 0 , "channelNo" : 0 , "enterLimit" : false , "enterOwner" : false , "ownerMemberNo" : 0 , "reservedUsers" : [ ], "enteredUsers" : [ ], "isCheckValid" : true , "createAt" : "2023-06-28T10:54:28.693000" , "lastTryEnterableCheckingAt" : "2023-06-27T10:54:28.693000" , "mongo_id" : "649c11644226789ff4f682ae" } }
可以看到, createAt
和lastTryEnterableCheckingAt
这两个字段里面的时间戳, 都没有携带时区信息.
也可能是笔者接触过太多次涉及到跨时区的沟通, 也厌倦了跟一些地理知识欠缺的同事(主要是海外的)去解释这些问题, 每次设计类似的方案的时候, 都会习惯性考虑到这个问题. 解决思路也很简单, 所有datetime
数据都带上timezone就可以了. 入库的时候, 不管是使用的北京时间还是伦敦时间, 都没关系, 关键是要在时间戳上面明确一下时区, 剩下的转换以及展示问题, 就交给程序.
查看了一下pymongo的关于datetime的文档 我们可以知道, 默认情况下, MongoDB都是使用UTC时间存储datetime
数据, 但不会显示时区. 如果想要显示时区, 则需要在客户端配置tz_aware
.
又再看了一下具体的配置方法, 得知需要通过bson的库配置CodecOptions
最后综合文档里面的示例代码, 笔者测试了一下:
1 2 3 4 >>> from bson.codec_options import CodecOptions>>> result2 = myclient['ZoneService' ]['zones' ].with_options(codec_options=CodecOptions(tz_aware=True )).find_one({})>>> result2{'_id' : ObjectId('649c11644226789ff4f682ae' ), 'groupId' : '200' , 'projectType' : 4 , 'version' : 'this is a secret' , 'role' : '' , 'platformBitTypesForNavigation' : 0 , 'platformBitTypesForMatching' : 7 , 'serverGuid' : '01002405-76BD-4BFC-BCED-8DE4C8541AFA' , 'zoneGuid' : 'C6385685-74E9-4673-A696-F0EDF7040296' , 'zoneExtendType' : 4 , 'zoneSearchKey' : '3901862586:1' , 'zoneSearchUniqueKey' : 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467' , 'zoneId' : 0 , 'ugcId' : '3901862586' , 'ugcVersionId' : '1' , 'ugcTitle' : '测试数据' , 'maxPlayerCount' : 2 , 'reservedCount' : 0 , 'connectionCount' : 0 , 'activeCount' : 0 , 'channelNo' : 0 , 'enterLimit' : False , 'enterOwner' : False , 'ownerMemberNo' : 0 , 'reservedUsers' : [], 'enteredUsers' : [], 'isCheckValid' : True , 'createAt' : datetime.datetime(2023 , 6 , 28 , 10 , 54 , 28 , 693000 , tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820 >), 'lastTryEnterableCheckingAt' : datetime.datetime(2023 , 6 , 27 , 10 , 54 , 28 , 693000 , tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820 >)}
好了, 现在在MongoDB的查询结果里面, datatime
数据会有带上时区数据了. 然后笔者突然又想起来了, 好像是在初始化MongoClient
的时候就有一个tz_aware
的配置项, 看了一眼文档, 再试一下:
1 2 3 4 5 6 >>> myclient=pymongo.MongoClient(host=dburl,tz_aware=True )>>> myclientMongoClient(host=['10.3.2.26:27017' , '10.3.2.24:27017' , '10.3.2.4:27017' ], document_class=dict , tz_aware=True , connect=True , replicaset='aaa111' ) >>> result3 = myclient['ZoneService' ]['zones' ].find_one({})>>> result3{'_id' : ObjectId('649c11644226789ff4f682ae' ), 'groupId' : '200' , 'projectType' : 4 , 'version' : 'this is a secret' , 'role' : '' , 'platformBitTypesForNavigation' : 0 , 'platformBitTypesForMatching' : 7 , 'serverGuid' : '01002405-76BD-4BFC-BCED-8DE4C8541AFA' , 'zoneGuid' : 'C6385685-74E9-4673-A696-F0EDF7040296' , 'zoneExtendType' : 4 , 'zoneSearchKey' : '3901862586:1' , 'zoneSearchUniqueKey' : 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467' , 'zoneId' : 0 , 'ugcId' : '3901862586' , 'ugcVersionId' : '1' , 'ugcTitle' : '测试数据' , 'maxPlayerCount' : 2 , 'reservedCount' : 0 , 'connectionCount' : 0 , 'activeCount' : 0 , 'channelNo' : 0 , 'enterLimit' : False , 'enterOwner' : False , 'ownerMemberNo' : 0 , 'reservedUsers' : [], 'enteredUsers' : [], 'isCheckValid' : True , 'createAt' : datetime.datetime(2023 , 6 , 28 , 10 , 54 , 28 , 693000 , tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820 >), 'lastTryEnterableCheckingAt' : datetime.datetime(2023 , 6 , 27 , 10 , 54 , 28 , 693000 , tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820 >)}
似乎都可以, 但是直接在MongoClient
中设置, 代码可以精简一点. 也都可以入库ES:
1 2 3 4 5 6 7 8 9 >>> result2['mongo_id' ] = str (result2['_id' ]) >>> del result2['_id' ]>>> es.index(index='rondo-temp-test' , body=result2){'_index' : 'rondo-temp-test' , '_type' : '_doc' , '_id' : 'EFAsaIkBTkcA723hY6eu' , '_version' : 1 , 'result' : 'created' , '_shards' : {'total' : 2 , 'successful' : 2 , 'failed' : 0 }, '_seq_no' : 1 , '_primary_term' : 1 } >>> >>> result3['mongo_id' ] = str (result3['_id' ]) >>> del result3['_id' ]>>> es.index(index='rondo-temp-test' , body=result3){'_index' : 'rondo-temp-test' , '_type' : '_doc' , '_id' : 'dFWOaIkBTkcA723hzm-1' , '_version' : 1 , 'result' : 'created' , '_shards' : {'total' : 2 , 'successful' : 2 , 'failed' : 0 }, '_seq_no' : 5 , '_primary_term' : 1 }
在ES中查询数据, 可以看到, 也能显示时区信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 GET /rondo-temp-test/_doc/EFAsaIkBTkcA723hY6eu { "_index" : "rondo-temp-test" , "_type" : "_doc" , "_id" : "EFAsaIkBTkcA723hY6eu" , "_version" : 1 , "_seq_no" : 1 , "_primary_term" : 1 , "found" : true , "_source" : { "groupId" : "200" , "projectType" : 4 , "version" : "this is a secret" , "role" : "" , "platformBitTypesForNavigation" : 0 , "platformBitTypesForMatching" : 7 , "serverGuid" : "01002405-76BD-4BFC-BCED-8DE4C8541AFA" , "zoneGuid" : "C6385685-74E9-4673-A696-F0EDF7040296" , "zoneExtendType" : 4 , "zoneSearchKey" : "3901862586:1" , "zoneSearchUniqueKey" : "NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467" , "zoneId" : 0 , "ugcId" : "3901862586" , "ugcVersionId" : "1" , "ugcTitle" : "测试数据" , "maxPlayerCount" : 2 , "reservedCount" : 0 , "connectionCount" : 0 , "activeCount" : 0 , "channelNo" : 0 , "enterLimit" : false , "enterOwner" : false , "ownerMemberNo" : 0 , "reservedUsers" : [ ], "enteredUsers" : [ ], "isCheckValid" : true , "createAt" : "2023-06-28T10:54:28.693000+00:00" , "lastTryEnterableCheckingAt" : "2023-06-27T10:54:28.693000+00:00" , "mongo_id" : "649c11644226789ff4f682ae" } }
梳理Main函数 那接下来, 就可以整合出一个完整的python脚本了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 ''' 用于定时监控MongoDB中ZoneService.zones的变化, 并收录到ES by rondochen ''' from pymongo import MongoClientfrom elasticsearch import Elasticsearchfrom time import sleepimport osimport datetimeimport pytzdef main (): myclient = MongoClient(host=os.getenv('mongodb_url' ), tz_aware=True ) es = Elasticsearch(hosts=os.getenv('es_url' )) while True : collect_date = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) print (collect_date) try : zones = myclient['ZoneService' ]['zones' ].find({}) except : print ('some thing wrong about mongodb query' ) for zone in zones: zone['mongo_id' ] = str (zone['_id' ]) del zone['_id' ] zone['timestamp' ] = collect_date try : print (es.index(index=os.getenv('zone_monitor_index_name' ),body=zone)) print ('----' ) except : print ('something wrong in es' ) sleep(int (os.getenv('check_interval' ) or 30 )) if __name__ == '__main__' : if os.getenv('es_url' ) and os.getenv('mongodb_url' )and os.getenv('zone_monitor_index_name' ): main() else : print ('require environment variable $es_url and $mongodb_url $zone_monitor_index_name' ) exit()
在运行之前, 使用shell的export
命令, 把环境变量设置好, 就可以启动了, 启动起来, 会有这样的输出:
1 2 3 4 5 6 7 8 9 10 # python temp.py 2023-07-18 10:41:24.089384+00:00 {'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'w1WXaIkBTkcA723hKMyT', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 9, '_primary_term': 1} ---- 2023-07-18 10:41:54.150495+00:00 {'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'xVWXaIkBTkcA723hndLq', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 10, '_primary_term': 1} ---- 2023-07-18 10:42:24.187752+00:00 {'_index': 'rondo-temp-test', '_type': '_doc', '_id': '1VWYaIkBTkcA723hE9g_', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 11, '_primary_term': 1} ----
在ES中检查相应的数据, 即可完成验收.
ES优化 写到这里, 可能已经有人忘了, 最初要采集MongoDB中的数据, 就是为了可视化用的. 为了能更好地满足可视化或者是业务跟踪的功能, MongoDB的数据在入库ES的时候, 是很有必要进行一番优化的. 否则, 我们可以预见一些字段类型不统一, 或者是ES查询上的性能/容量问题.
根据笔者的经验, 在规划ES的时候, 主要考虑的是:
shard/replica数量, 一般是ES集群有多少个节点就设置多少个shard, 然后replica设置成1.
压缩, 只要系统性能没有太紧张, 都会设置best_compression
, 节约存储空间.
根据业务场景设置合适的刷新频率
索引生命周期(ILM)
字段类型设置(mapping), 结合实际业务场景考虑
Component Template 在笔者的这个例子中, 做的简单配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "template" : { "settings" : { "index" : { "codec" : "best_compression" , "mapping" : { "total_fields" : { "limit" : "2000" } }, "refresh_interval" : "30s" , "number_of_shards" : "3" , "max_docvalue_fields_search" : "200" } }, "mappings" : {...省略...} }, "version" : 20230705 }
ILM 在开发环境中, 一般会设置得比较随意:
如果是生产环境, 会再另外考虑
Index Template 笔者的经验是: 每个环境创建一个带有环境标识的Index Template, 引用同一个Component Template用于规范字段, 再按需引用ILM. 而在引用ILM的时候, 每个环境设置不同的rollover_alias
, 后面带上环境标识.
创建初始索引 因为我们在上面做了ILM的设置, 为了能让索引能正确地rollover, 我们还需要创建一个初始化索引, 类似这样:
1 2 3 4 5 6 PUT zone-monitor-dev-000001 { "aliases": { "zone-monitor-dev": { } } }
这样, 我们的脚本可以向zone-watcher-dev
输入数据, 而ES又可以根据索引的生命周期设置, 自动地触发rollover, 创建后续的zone-monitor-dev-000002
, zone-monitor-dev-000003
等索引.
使用docker部署 脚本开发完了, 也验证可以运行了, 甚至也开始在kibana做一些可视化设置了. 就开始考虑在各个环境部署了, 为了方便, 我们可以很直观地考虑到, 使用docker去部署.
在开始编写dockerfile之前, 先按照主流的做法, 把脚本里面用到的库导出来, 记录在requirements.txt
.
可以使用pipreqs
直接导出项目所需的库:
1 2 3 4 5 6 7 (zones-monitor) [root@VM-111-16-centos tmp]# pip install pipreqs (zones-monitor) [root@VM-111-16-centos tmp]# pipreqs . INFO: Successfully saved requirements file in ./requirements.txt (zones-monitor) [root@VM-111-16-centos tmp]# cat requirements.txt elasticsearch==7.10.0 pymongo==4.3.3 pytz==2023.3
dockerfile 对于这种简单的运维脚本, 几行搞定:
1 2 3 4 5 6 7 from python:3.9 -alpineWORKDIR /app COPY requirements.txt requirements.txt RUN pip3 install -r requirements.txt COPY zone-monitor.py zone-monitor.py CMD ["python3" , "zone-monitor.py" ]
构建命令:
1 docker build -t zone-monitor:1.0 -f zone-monitor.dockerfile .
通过外部文件记录正确的配置内容, 直接启动:
1 2 3 4 5 6 7 # cat dev.env es_url=http://elastic:******@10.3.2.37:9200/ mongodb_url=mongodb://mongouser:xxxxxx@10.3.2.26:27017,10.3.2.4:27017,10.3.2.24:27017/?replicaSet=aaa111 zone_monitor_index_name=zone-monitor-dev check_interval=60 docker run -d --env-file ./dev.env --name zone-monitor-dev zone-monitor:1.0
docker compose 本来做到这里就差不多了, 但在笔者真正工作的项目中, 这个脚本还衍生出了很多雷同的, 监控不同的MongoDB数据库里面的内容, 每个环境都有好几个脚本要启动, 所以就会用到docker compose. 这样, 在给各个环境部署的时候, 只需要拿到docker compose的yaml文件, 再写好正确的环境变量, 就可以很规范地完成启动.
1 2 3 4 5 6 7 8 9 10 services: zone-monitor-dev: build: context: . dockerfile: zone-monitor.dockerfile image: zone-monitor:1.0 env_file: - dev.env container_name: zone-monitor-dev ...省略下面的多个service...
启动命令:
1 docker compose -f docker-compose-dev.yml up -d
后记 通过一次方案设计, 聊了一下笔者的一些工作上的片段. 总的来说, 就是围绕着把MongoDB中的数据收录到ES中 这个需求, 最后衍生出来的一系列工作, 包括:
明确需求
调研可行性
编写代码
反复打磨
部署
优化
这类型的项目, 在后续的维护中, 往往还需要关注随着版本迭代而引起的MongoDB数据变化, ES的存储空间/性能, 以及可视化设置等.
笔者一直认为, 技术都是为具体业务而服务的, 就像本例中这种简单的业务场景, 就没有非要凹造型弄出一些面向对象的复杂代码. 最后进行容器化改造, 也只是为了应付多环境部署这个场景.
本文中涉及的业务场景乃至代码, 对其他人来说估计也没有什么实质的参考价值, 但可以通过整一个流程, 大概感受一下作为业务运维, 在设计一个监控方案时候的一些思路.
参考文档 https://www.elastic.co/guide/en/elasticsearch/reference/7.10/indices-component-template.html
https://pymongo.readthedocs.io/en/stable/examples/datetimes.html
pymongo和ES的很多问题, 官方文档都有答案.