Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQ
- 创业
- 2025-09-21 11:18:01

设计一个在Linux上运行的GCC C++程序,同时连接三个不同的MySQL实例,其中两个实例中分别有两个Schema的表结构分别与第三实例中两个Schema个结构完全相同,同时复制两个实例中两个Schema里的所有表的数据到第三个实例中两个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
C++解决方案,采用模块化设计并包含详细注释:
该实现结合了C++的高效性和现代C++的特性,同时利用MySQL X DevAPI提供的高性能接口,能够有效处理大规模数据复制需求。
#include <iostream> #include <fstream> #include <chrono> #include <thread> #include <vector> #include <map> #include <mysqlx/xdevapi.h> #include <nlohmann/json.hpp> #include <filesystem> #include <mutex> using namespace std; using namespace mysqlx; using json = nlohmann::json; namespace fs = filesystem; // 配置结构体 struct Config { struct DBConfig { string host; int port; string user; string password; }; DBConfig source1; DBConfig source2; DBConfig target; int max_retries = 3; int retry_interval = 60; // seconds map<string, vector<string>> schema_mapping; }; // 日志管理器类 class Logger { private: mutex log_mutex; string log_dir = "logs"; string get_current_date() { auto now = chrono::system_clock::now(); time_t t = chrono::system_clock::to_time_t(now); tm tm = *localtime(&t); char buffer[11]; strftime(buffer, sizeof(buffer), "%Y%m%d", &tm); return string(buffer); } public: Logger() { if (!fs::exists(log_dir)) { fs::create_directory(log_dir); } } void log(const string& message) { lock_guard<mutex> lock(log_mutex); ofstream log_file( log_dir + "/" + get_current_date() + ".log", ios::app ); auto now = chrono::system_clock::now(); time_t t = chrono::system_clock::to_time_t(now); log_file << put_time(localtime(&t), "%Y-%m-%d %H:%M:%S") << " | " << message << endl; } }; // 数据库操作类 class DBOperator { Config& config; Logger& logger; Session connect_db(const Config::DBConfig& db_conf) { try { return Session( db_conf.host, db_conf.port, db_conf.user, db_conf.password ); } catch (const Error& e) { logger.log("Connection error: " + string(e.what())); throw; } } public: DBOperator(Config& cfg, Logger& log) : config(cfg), logger(log) {} void copy_table_data(Schema& source_schema, Schema& target_schema, const string& table_name, int batch_size = 1000) { auto start_time = chrono::system_clock::now(); string log_prefix = "Table[" + table_name + "] "; try { Table source_table = source_schema.getTable(table_name); Table target_table = target_schema.getTable(table_name); int total_rows = 0; RowResult res = source_table.select("*").execute(); while (const Row row = res.fetchOne()) { vector<Row> batch; for (int i = 0; i < batch_size && row; ++i) { batch.push_back(row); total_rows++; if (i < batch_size-1) row = res.fetchOne(); } target_table.insert().rows(batch).execute(); } auto end_time = chrono::system_clock::now(); auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time); logger.log(log_prefix + "SUCCESS | Rows: " + to_string(total_rows) + " | Duration: " + to_string(duration.count()) + "ms"); } catch (const Error& e) { logger.log(log_prefix + "ERROR: " + string(e.what())); throw; } } void copy_schema(Session& source_session, Session& target_session, const string& source_schema_name, const string& target_schema_name, const vector<string>& tables) { Schema source_schema = source_session.getSchema(source_schema_name); Schema target_schema = target_session.getSchema(target_schema_name); for (const auto& table : tables) { for (int attempt = 0; attempt < config.max_retries; ++attempt) { try { copy_table_data(source_schema, target_schema, table); break; } catch (...) { if (attempt == config.max_retries-1) { logger.log("Table[" + table + "] FAILED after " + to_string(config.max_retries) + " attempts"); } this_thread::sleep_for( chrono::seconds(config.retry_interval) ); } } } } }; int main(int argc, char** argv) { try { // 加载配置文件 ifstream config_file("config.json"); json config_json = json::parse(config_file); Config config; config.source1 = config_json["source1"]; config.source2 = config_json["source2"]; config.target = config_json["target"]; config.schema_mapping = config_json["schema_mapping"]; Logger logger; DBOperator db_operator(config, logger); // 连接数据库 Session source1 = db_operator.connect_db(config.source1); Session source2 = db_operator.connect_db(config.source2); Session target = db_operator.connect_db(config.target); // 执行复制任务 for (const auto& [source_schema, target_schema] : config.schema_mapping) { vector<string> tables = config.schema_mapping[source_schema]; // 从第一个源复制 db_operator.copy_schema(source1, target, source_schema, target_schema, tables); // 从第二个源复制 db_operator.copy_schema(source2, target, source_schema, target_schema, tables); } } catch (const exception& e) { cerr << "Fatal error: " << e.what() << endl; return 1; } return 0; }配置文件示例 (config.json):
{ "source1": { "host": "source1.example ", "port": 3306, "user": "user", "password": "password" }, "source2": { "host": "source2.example ", "port": 3307, "user": "user", "password": "password" }, "target": { "host": "target.example ", "port": 3308, "user": "user", "password": "password" }, "max_retries": 3, "retry_interval": 60, "schema_mapping": { "source_schema1": ["table1", "table2"], "source_schema2": ["table3", "table4"] } }关键特性说明:
配置管理: 使用JSON配置文件存储数据库连接参数和复制参数支持灵活的模式映射配置可配置重试策略和批量大小 日志系统: 自动创建日志目录按天滚动日志文件详细记录每个操作的元数据线程安全的日志写入 数据复制: 使用MySQL X DevAPI进行高效数据操作批量插入机制提升性能自动重试机制和错误隔离表级并行复制(可扩展) 异常处理: 多级异常捕获机制连接失败自动重试数据操作错误隔离资源自动释放保障 扩展性: 模块化设计便于功能扩展支持动态表列表配置可扩展的多线程支持编译运行:
# 安装依赖 sudo apt-get install libmysqlcppconn-dev nlohmann-json3-dev # 编译 g++ -std=c++17 -o db_sync main.cpp -lmysqlcppconn8 -lpthread # 运行 ./db_sync建议的优化扩展方向:
并行处理: 使用线程池实现表级并行复制异步IO提升吞吐量连接池管理数据库连接 监控增强: 实时进度报告Prometheus指标导出健康检查端点 高级功能: 增量复制支持模式校验机制数据一致性校验自动断点续传 部署优化: Docker容器化封装Systemd服务集成配置热加载支持Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQ由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQ”