DataX mongodb导出数据到mysql

Python publisher01 87℃

Python版本要为2
cmd乱码解决:输入CHCP 65001
数据库中的数据中文乱码解决:在json文件中jdbcUrl项加上:?characterEncoding=utf8

DataX介绍

安装DataX

DataX下载地址
下载完成解压至某个路径下即可

查看配置模板

python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

例如mysql:

C:\DataX\bin>python datax.py -r mysqlreader -w mysqlwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the mysqlwriter document:
     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

配置mongodb2mysql.json文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["*.*.*.*:27017"],
                    "userName": "root",
                    "userPassword": "123456",
                    "dbName": "weixin",
                    "collectionName": "fileids_wxpy",
                    "column": [{
                        "index":0,
                        "name": "_id",
                        "type": "string"
                    }, {
                        "index":1,
                        "name": "crawler_time",
                        "type": "string"
                    }, {
                        "index":2,
                        "name": "file_url",
                        "type": "string"
                    }, {
                        "index":3,
                        "name": "flag",
                        "type": "string"
                    }, {
                        "index":4,
                        "name": "logo_url",
                        "type": "string"
                    }, {
                        "index":5,
                        "name": "source",
                        "type": "string"
                    }, {
                        "index":6,
                        "name": "update_date",
                        "type": "string"
                    }, {
                        "index":7,
                        "name": "update_time",
                        "type": "long"
                    }, {
                        "index":8,
                        "name": "wx_id",
                        "type": "string"
                    }, {
                        "index":9,
                        "name": "wx_name",
                        "type": "string"
                    }]
                }
            },
             "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",
                        "crawler_time",
                        "file_url",
                        "flag",
                        "logo_url",
                        "source",
                        "update_date",
                        "update_time",
                        "wx_id",
                        "wx_name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                "table": ["fileids_wxpy"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                }
        }]
    }
}

mysql新建数据库 、表

create DATABASE weixin;
use weixin;
DROP TABLE IF EXISTS `fileids_wxpy`;
CREATE TABLE `fileids_wxpy` (
  `id` bigint(20) unsigned NOT NULL,
  `crawler_time` int(10) unsigned NOT NULL,
  `file_url` varchar(255) NOT NULL,
  `flag` varchar(255) NOT NULL,
  `logo_url` varchar(255) NOT NULL,
  `source` varchar(255) NOT NULL,
  `update_date` int(10) unsigned NOT NULL,
  `update_time` int(10) unsigned NOT NULL,
  `wx_id` varchar(255) NOT NULL,
  `wx_name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

启动

C:\DataX\bin>python datax.py mongodb2mysql.json

报错

Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=*.*.*.*:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='root', source='weixin', password=<hidden>, mechanismProperties={}}}, caused by {com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server*.*.*.*:27017. The full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }}}]
        at com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:369)
        at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:101)
        at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
        at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
        at com.mongodb.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:63)
        at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:201)
        at com.mongodb.operation.CountOperation.execute(CountOperation.java:206)
        at com.mongodb.operation.CountOperation.execute(CountOperation.java:53)
        at com.mongodb.Mongo.execute(Mongo.java:772)
        at com.mongodb.Mongo$2.execute(Mongo.java:759)
        at com.mongodb.MongoCollectionImpl.count(MongoCollectionImpl.java:185)
        at com.mongodb.MongoCollectionImpl.count(MongoCollectionImpl.java:165)
        at com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil.doSplitInterval(CollectionSplitUtil.java:55)
        at com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil.doSplit(CollectionSplitUtil.java:37)
        at com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReader$Job.split(MongoDBReader.java:37)
        at com.alibaba.datax.core.job.JobContainer.doReaderSplit(JobContainer.java:732)
        at com.alibaba.datax.core.job.JobContainer.split(JobContainer.java:393)
        at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:117)
        ... 3 more

原因

MongoDB中每个数据库之间是相互独立的,都有独立的权限,正确的做法是使用root账号在【将要操作的数据库】中创建一个【子账号】,在用这个子账号连接mongo

解决办法

>use admin
switched to db admin
>db.auth("root","******")
1
>show dbs
admin
local
weixin
>use weixin
switched to db weixin
>db.createUser(
        {
            user:"DataXTest",
            pwd:"123456",
            roles:[{role:"dbOwner",db:"weixin"}]
        }
)
Successfully added user: {
    "user" : "DataXTest",
    "roles" : [
        {
            "role" : "dbOwner",
            "db" : "weixin"
        }
    ]
}

使用DataXTest来替换jsono配置文件中mongodb的账号root后,再次运行

C:\DataX\bin>python datax.py mongodb2mysql.json
2019-06-13 14:39:40.218 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-13 14:39:40.219 [job-0] INFO  StandAloneJobContainerCommunicator - Total 50115 records, 17716504 bytes | Speed 36.04KB/s, 104 records/s | Error 12 records, 3513 bytes |  All Task WaitWriterTime 259.684s |  All Task WaitReaderTime 207.041s | Percentage 100.00%
2019-06-13 14:39:40.221 [job-0] INFO  JobContainer -
任务启动时刻                    : 2019-06-13 14:31:33
任务结束时刻                    : 2019-06-13 14:39:40
任务总计耗时                    :                487s
任务平均流量                    :           36.04KB/s
记录写入速度                    :            104rec/s
读出记录总数                    :               50115
读写失败总数                    :                  12

注: 此处错误的12条记录是由于id 长度超过19位

转载请注明:Python量化投资 » DataX mongodb导出数据到mysql

喜欢 (0)or分享 (0)