基於C++11的資料庫連線池實現

2022-05-25 06:01:54

0.注意

該篇文章為了讓大家儘快看到效果,程式碼放置比較靠前,看程式碼前務必看下第4部分的基礎知識。

1.資料庫連線池

1.1 是什麼?

資料庫連線池負責分配、管理和釋放資料庫連線,屬於池化機制的一種,類似的還有執行緒池等。

1.2 為什麼用?

各種池化技術的使用原因都是類似的,也就是單獨操作比較浪費系統資源,利用池提前準備一些資源,在需要時可以重複使用這些預先準備的資源,從而減少系統開銷,實現資源重複利用。對於資料庫連線來關閉來說,需要經過四步:
(1)建立通訊連線的 TCP 三次握手
(2)資料庫伺服器的連線認證
(3)資料庫伺服器關閉連線時的資源回收
(4)斷開通訊連線的 TCP 四次揮手
而利用資料庫連線池則減少了這幾步的系統開銷,更加的高效。

1.3如何設計?

原理類似於線性池,在資料庫連線池中提前建立好多個資料庫連線,使用時從資料庫連線池中取出,使用完放回資料庫連線池。資料庫連線池中的資料庫連線排程由資料庫連線池排程。

2.基於C++11的實現

Talk is cheap. Show me the code.

直接看程式,原理、函數在後面再介紹。

2.1程式

(0)全部程式碼:
下載之一即可
百度網路硬碟連結:https://pan.baidu.com/s/1wvcLn0CgZxbDpYdapDVUow?pwd=bnd9 提取碼:bnd9
阿里雲盤連線:https://www.aliyundrive.com/s/Emsy9UJLxiv 提取碼: h46t
夸克網路硬碟連結:https://pan.quark.cn/s/58eb69a3f0fb 提取碼:iRiw
(1)MysqlConn.h

#pragma once
#include <iostream>
#include <mysql.h>
#include <chrono>
using namespace std;
using namespace chrono;
class MysqlConn
{
public:
    // 初始化資料庫連線
    MysqlConn();
    // 釋放資料庫連線
    ~MysqlConn();
    // 連線資料庫
    bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
    // 更新資料庫: insert, update, delete
    bool update(string sql);
    // 查詢資料庫
    bool query(string sql);
    // 遍歷查詢得到的結果集
    bool next();
    // 得到結果集中指定位置的欄位值
    string value(int index);
    // 事務操作(提交方式)
    bool transaction();
    // 提交事務
    bool commit();
    // 事務回滾 
    bool rollback();
    // 重新整理起始的空閒時間點
    void refreshAliveTime();
    // 計算連線存活的總時長
    long long getAliveTime();
private:
    void freeResult();//釋放m_result空間
    MYSQL* m_conn = nullptr;
    MYSQL_RES* m_result = nullptr;
    MYSQL_ROW m_row = nullptr;
    steady_clock::time_point m_alivetime;//當前時間點
};

(2)MysqlConn.cpp

#include "MysqlConn.h"

MysqlConn::MysqlConn()
{
    m_conn = mysql_init(nullptr);//初始化mysql
    mysql_set_character_set(m_conn, "utf8");//設定編碼格式維utf8
}

MysqlConn::~MysqlConn()
{
    if (m_conn != nullptr)
    {
        mysql_close(m_conn);
    }
    freeResult();
}

bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port)
{
    MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
    return ptr != nullptr;
}

bool MysqlConn::update(string sql)
{
    if (mysql_query(m_conn, sql.c_str()))
    {
        return false;
    }
    return true;
}

bool MysqlConn::query(string sql)
{
    freeResult();
    if (mysql_query(m_conn, sql.c_str()))
    {
        return false;
    }
    m_result = mysql_store_result(m_conn);
    return true;
}

bool MysqlConn::next()
{
    if (m_result != nullptr)
    {
        m_row = mysql_fetch_row(m_result); //檢索結果集的下一行,如果沒有要檢索的行,mysql_fetch_row()返回NULL
        if (m_row != nullptr)
        {
            return true;
        }
    }
    return false;
}

string MysqlConn::value(int index)
{
    int rowCount = mysql_num_fields(m_result);
    if (index >= rowCount || index < 0)
    {
        return string();
    }
    char* val = m_row[index];
    unsigned long length = mysql_fetch_lengths(m_result)[index];//為了避免下一步在「/0」處被截斷
    return string(val, length);
}

bool MysqlConn::transaction()
{
    return mysql_autocommit(m_conn, false);//事務提交方式改為手動提交
}

bool MysqlConn::commit()
{
    return mysql_commit(m_conn);
}

bool MysqlConn::rollback()
{
    return mysql_rollback(m_conn);
}

void MysqlConn::refreshAliveTime()
{
    m_alivetime = steady_clock::now();
}

long long MysqlConn::getAliveTime()
{
    nanoseconds res = steady_clock::now() - m_alivetime;
    milliseconds millsec = duration_cast<milliseconds>(res);
    return millsec.count();
}

void MysqlConn::freeResult()
{
    if (m_result)
    {
        mysql_free_result(m_result);
        m_result = nullptr;
    }
}

(3)ConnectionPool.h

#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include "MysqlConn.h"
using namespace std;
class ConnectionPool
{
public:
    //單例模式
    static ConnectionPool* getConnectPool();
    ConnectionPool(const ConnectionPool& obj) = delete;
    ConnectionPool& operator=(const ConnectionPool& obj) = delete;
    ~ConnectionPool();

    shared_ptr<MysqlConn> getConnection();//任務從連線池中獲取一個連線
    
private:
    ConnectionPool();//單例模式

    bool parseJsonFile();//解析Json組態檔
    void produceConnection();//生產新的連線
    void recycleConnection();//回收多餘連線
    void addConnection();//新增單個連線

    //MysqlConn::connect所需要的引數
    string m_ip;
    string m_user;
    string m_passwd;
    string m_dbName;
    unsigned short m_port;

    //連線池的引數
    int m_minSize;//最小連線數
    int m_maxSize;//最大連線數
    int m_timeout;//超時等待時間
    int m_maxIdleTime;//待回收執行緒的超時時間

    queue<MysqlConn*> m_connectionQ;//任務佇列
    mutex m_mutexQ;//互斥鎖
    condition_variable m_cond;//條件變數
};

(4)ConnectionPool.cpp

#include "ConnectionPool.h"
#include <json/json.h>
#include <fstream>
#include <thread>
using namespace Json;
ConnectionPool* ConnectionPool::getConnectPool()
{
    static ConnectionPool pool;
    return &pool;
}

bool ConnectionPool::parseJsonFile()
{
    ifstream ifs("dbconf.json");
    Reader rd;
    Value root;
    rd.parse(ifs, root);
    if (root.isObject())
    {
        m_ip = root["ip"].asString();
        m_port = root["port"].asInt();
        m_user = root["userName"].asString();
        m_passwd = root["password"].asString();
        m_dbName = root["dbName"].asString();
        m_minSize = root["minSize"].asInt();
        m_maxSize = root["maxSize"].asInt();
        m_maxIdleTime = root["maxIdleTime"].asInt();
        m_timeout = root["timeout"].asInt();
        return true;
    }
    return false;
}

void ConnectionPool::produceConnection()
{
    while (true)
    {
        unique_lock<mutex> locker(m_mutexQ);
        while (m_connectionQ.size() >= m_minSize)
        {
            m_cond.wait(locker);
        }
        addConnection();
        m_cond.notify_all();
    }
}

void ConnectionPool::recycleConnection()
{
    while (true)
    {
        this_thread::sleep_for(chrono::milliseconds(500));
        lock_guard<mutex> locker(m_mutexQ);
        while (m_connectionQ.size() > m_minSize)
        {
            MysqlConn* conn = m_connectionQ.front();
            if (conn->getAliveTime() >= m_maxIdleTime)
            {
                m_connectionQ.pop();
                delete conn;
            }
            else
            {
                break;
            }
        }
    }
}

void ConnectionPool::addConnection()
{
    MysqlConn* conn = new MysqlConn;
    conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
    conn->refreshAliveTime();
    m_connectionQ.push(conn);
}

shared_ptr<MysqlConn> ConnectionPool::getConnection()
{
    unique_lock<mutex> locker(m_mutexQ);
    while (m_connectionQ.empty())
    {
        if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout)))
        {
            if (m_connectionQ.empty())
            {
                //return nullptr;
                continue;
            }
        }
    }
    shared_ptr<MysqlConn> connptr(m_connectionQ.front(), [this](MysqlConn* conn) {
        lock_guard<mutex> locker(m_mutexQ);
        conn->refreshAliveTime();
        m_connectionQ.push(conn);
        });//自定義shared_ptr的解構方法
    m_connectionQ.pop();
    m_cond.notify_all();
    return connptr;
}

ConnectionPool::~ConnectionPool()
{
    while (!m_connectionQ.empty())
    {
        MysqlConn* conn = m_connectionQ.front();
        m_connectionQ.pop();
        delete conn;
    }
}

ConnectionPool::ConnectionPool()
{
    // 載入組態檔
    if (!parseJsonFile())
    {
        return;
    }

    for (int i = 0; i < m_minSize; ++i)
    {
        addConnection();
    }
    thread producer(&ConnectionPool::produceConnection, this);
    thread recycler(&ConnectionPool::recycleConnection, this);
    producer.detach();
    recycler.detach();
}

2.2 測試程式碼main.cpp

#include <iostream>
#include <memory>
#include "MysqlConn.h"
#include "ConnectionPool.h"
using namespace std;
// 1. 單執行緒: 使用/不使用連線池
// 2. 多執行緒: 使用/不使用連線池

void op1(int begin, int end)
{
    for (int i = begin; i < end; ++i)
    {
        MysqlConn conn;
        conn.connect("root", "123159", "testdb", "192.168.237.131");
        char sql[1024] = { 0 };
        sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
        conn.update(sql);
    }
}

void op2(ConnectionPool* pool, int begin, int end)
{
    for (int i = begin; i < end; ++i)
    {
        shared_ptr<MysqlConn> conn = pool->getConnection();
        char sql[1024] = { 0 };
        sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
        conn->update(sql);
    }
}

void test1()
{
#if 1
    // 非連線池, 單執行緒, 用時: 21037278300 納秒, 21037 毫秒
    steady_clock::time_point begin = steady_clock::now();
    op1(0, 5000);
    steady_clock::time_point end = steady_clock::now();
    auto length = end - begin;
    cout << "非連線池, 單執行緒, 用時: " << length.count() << " 納秒, "
        << length.count() / 1000000 << " 毫秒" << endl;
#else
    // 連線池, 單執行緒, 用時: 8838406500 納秒, 8838 毫秒
    ConnectionPool* pool = ConnectionPool::getConnectPool();
    steady_clock::time_point begin = steady_clock::now();
    op2(pool, 0, 5000);
    steady_clock::time_point end = steady_clock::now();
    auto length = end - begin;
    cout << "連線池, 單執行緒, 用時: " << length.count() << " 納秒, "
        << length.count() / 1000000 << " 毫秒" << endl;

#endif
}

void test2()
{
#if 0
    // 非連線池, 多單執行緒, 用時: 13277417000 納秒, 13277 毫秒
    MysqlConn conn;
    conn.connect("root", "root", "testdb", "192.168.237.131");
    steady_clock::time_point begin = steady_clock::now();
    thread t1(op1, 0, 1000);
    thread t2(op1, 1000, 2000);
    thread t3(op1, 2000, 3000);
    thread t4(op1, 3000, 4000);
    thread t5(op1, 4000, 5000);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    steady_clock::time_point end = steady_clock::now();
    auto length = end - begin;
    cout << "非連線池, 多單執行緒, 用時: " << length.count() << " 納秒, "
        << length.count() / 1000000 << " 毫秒" << endl;

#else
    // 連線池, 多單執行緒, 用時: 3938502100 納秒, 3938 毫秒
    ConnectionPool* pool = ConnectionPool::getConnectPool();
    steady_clock::time_point begin = steady_clock::now();
    thread t1(op2, pool, 0, 1000);
    thread t2(op2, pool, 1000, 2000);
    thread t3(op2, pool, 2000, 3000);
    thread t4(op2, pool, 3000, 4000);
    thread t5(op2, pool, 4000, 5000);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    steady_clock::time_point end = steady_clock::now();
    auto length = end - begin;
    cout << "連線池, 多單執行緒, 用時: " << length.count() << " 納秒, "
        << length.count() / 1000000 << " 毫秒" << endl;

#endif
}

int query()
{
    MysqlConn conn;
    bool tt = conn.connect("root", "123159", "testdb", "127.0.0.1");
    cout << "tt:  " << tt << endl;
    string sql = "insert into person values(7, 25, 'man', 'tom')";
    bool flag = conn.update(sql);
    cout << "flag value:  " << flag << endl;

    sql = "select * from person";
    conn.query(sql);
    while (conn.next())
    {
        cout << conn.value(0) << ", "
            << conn.value(1) << ", "
            << conn.value(2) << ", "
            << conn.value(3) << endl;
    }
    return 0;
}
int main()
{
    test2();
    return 0;
}

3.設定

需要設定jsoncpp和mysql
(1)設定jasoncpp

(2)設定mysql

4.基礎知識

4.1MySQL在C語言中的API

參考:https://www.mysqlzh.com/doc/196/115.html

4.2 jsoncpp的基本知識

(1)基本函數

(2)解析Json格式資料

5.參考

https://www.bilibili.com/video/BV1Fr4y1s7w4
https://subingwen.cn/cpp/dbconnectionPool/