該篇文章為了讓大家儘快看到效果,程式碼放置比較靠前,看程式碼前務必看下第4部分的基礎知識。
資料庫連線池負責分配、管理和釋放資料庫連線,屬於池化機制的一種,類似的還有執行緒池等。
各種池化技術的使用原因都是類似的,也就是單獨操作比較浪費系統資源,利用池提前準備一些資源,在需要時可以重複使用這些預先準備的資源,從而減少系統開銷,實現資源重複利用。對於資料庫連線來關閉來說,需要經過四步:
(1)建立通訊連線的 TCP 三次握手
(2)資料庫伺服器的連線認證
(3)資料庫伺服器關閉連線時的資源回收
(4)斷開通訊連線的 TCP 四次揮手
而利用資料庫連線池則減少了這幾步的系統開銷,更加的高效。
原理類似於線性池,在資料庫連線池中提前建立好多個資料庫連線,使用時從資料庫連線池中取出,使用完放回資料庫連線池。資料庫連線池中的資料庫連線排程由資料庫連線池排程。
Talk is cheap. Show me the code.
直接看程式,原理、函數在後面再介紹。
(0)全部程式碼:
下載之一即可
百度網路硬碟連結:https://pan.baidu.com/s/1wvcLn0CgZxbDpYdapDVUow?pwd=bnd9 提取碼:bnd9
阿里雲盤連線:https://www.aliyundrive.com/s/Emsy9UJLxiv 提取碼: h46t
夸克網路硬碟連結:https://pan.quark.cn/s/58eb69a3f0fb 提取碼:iRiw
(1)MysqlConn.h
#pragma once
#include <iostream>
#include <mysql.h>
#include <chrono>
using namespace std;
using namespace chrono;
class MysqlConn
{
public:
// 初始化資料庫連線
MysqlConn();
// 釋放資料庫連線
~MysqlConn();
// 連線資料庫
bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
// 更新資料庫: insert, update, delete
bool update(string sql);
// 查詢資料庫
bool query(string sql);
// 遍歷查詢得到的結果集
bool next();
// 得到結果集中指定位置的欄位值
string value(int index);
// 事務操作(提交方式)
bool transaction();
// 提交事務
bool commit();
// 事務回滾
bool rollback();
// 重新整理起始的空閒時間點
void refreshAliveTime();
// 計算連線存活的總時長
long long getAliveTime();
private:
void freeResult();//釋放m_result空間
MYSQL* m_conn = nullptr;
MYSQL_RES* m_result = nullptr;
MYSQL_ROW m_row = nullptr;
steady_clock::time_point m_alivetime;//當前時間點
};
(2)MysqlConn.cpp
#include "MysqlConn.h"
MysqlConn::MysqlConn()
{
m_conn = mysql_init(nullptr);//初始化mysql
mysql_set_character_set(m_conn, "utf8");//設定編碼格式維utf8
}
MysqlConn::~MysqlConn()
{
if (m_conn != nullptr)
{
mysql_close(m_conn);
}
freeResult();
}
bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port)
{
MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
return ptr != nullptr;
}
bool MysqlConn::update(string sql)
{
if (mysql_query(m_conn, sql.c_str()))
{
return false;
}
return true;
}
bool MysqlConn::query(string sql)
{
freeResult();
if (mysql_query(m_conn, sql.c_str()))
{
return false;
}
m_result = mysql_store_result(m_conn);
return true;
}
bool MysqlConn::next()
{
if (m_result != nullptr)
{
m_row = mysql_fetch_row(m_result); //檢索結果集的下一行,如果沒有要檢索的行,mysql_fetch_row()返回NULL
if (m_row != nullptr)
{
return true;
}
}
return false;
}
string MysqlConn::value(int index)
{
int rowCount = mysql_num_fields(m_result);
if (index >= rowCount || index < 0)
{
return string();
}
char* val = m_row[index];
unsigned long length = mysql_fetch_lengths(m_result)[index];//為了避免下一步在「/0」處被截斷
return string(val, length);
}
bool MysqlConn::transaction()
{
return mysql_autocommit(m_conn, false);//事務提交方式改為手動提交
}
bool MysqlConn::commit()
{
return mysql_commit(m_conn);
}
bool MysqlConn::rollback()
{
return mysql_rollback(m_conn);
}
void MysqlConn::refreshAliveTime()
{
m_alivetime = steady_clock::now();
}
long long MysqlConn::getAliveTime()
{
nanoseconds res = steady_clock::now() - m_alivetime;
milliseconds millsec = duration_cast<milliseconds>(res);
return millsec.count();
}
void MysqlConn::freeResult()
{
if (m_result)
{
mysql_free_result(m_result);
m_result = nullptr;
}
}
(3)ConnectionPool.h
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include "MysqlConn.h"
using namespace std;
class ConnectionPool
{
public:
//單例模式
static ConnectionPool* getConnectPool();
ConnectionPool(const ConnectionPool& obj) = delete;
ConnectionPool& operator=(const ConnectionPool& obj) = delete;
~ConnectionPool();
shared_ptr<MysqlConn> getConnection();//任務從連線池中獲取一個連線
private:
ConnectionPool();//單例模式
bool parseJsonFile();//解析Json組態檔
void produceConnection();//生產新的連線
void recycleConnection();//回收多餘連線
void addConnection();//新增單個連線
//MysqlConn::connect所需要的引數
string m_ip;
string m_user;
string m_passwd;
string m_dbName;
unsigned short m_port;
//連線池的引數
int m_minSize;//最小連線數
int m_maxSize;//最大連線數
int m_timeout;//超時等待時間
int m_maxIdleTime;//待回收執行緒的超時時間
queue<MysqlConn*> m_connectionQ;//任務佇列
mutex m_mutexQ;//互斥鎖
condition_variable m_cond;//條件變數
};
(4)ConnectionPool.cpp
#include "ConnectionPool.h"
#include <json/json.h>
#include <fstream>
#include <thread>
using namespace Json;
ConnectionPool* ConnectionPool::getConnectPool()
{
static ConnectionPool pool;
return &pool;
}
bool ConnectionPool::parseJsonFile()
{
ifstream ifs("dbconf.json");
Reader rd;
Value root;
rd.parse(ifs, root);
if (root.isObject())
{
m_ip = root["ip"].asString();
m_port = root["port"].asInt();
m_user = root["userName"].asString();
m_passwd = root["password"].asString();
m_dbName = root["dbName"].asString();
m_minSize = root["minSize"].asInt();
m_maxSize = root["maxSize"].asInt();
m_maxIdleTime = root["maxIdleTime"].asInt();
m_timeout = root["timeout"].asInt();
return true;
}
return false;
}
void ConnectionPool::produceConnection()
{
while (true)
{
unique_lock<mutex> locker(m_mutexQ);
while (m_connectionQ.size() >= m_minSize)
{
m_cond.wait(locker);
}
addConnection();
m_cond.notify_all();
}
}
void ConnectionPool::recycleConnection()
{
while (true)
{
this_thread::sleep_for(chrono::milliseconds(500));
lock_guard<mutex> locker(m_mutexQ);
while (m_connectionQ.size() > m_minSize)
{
MysqlConn* conn = m_connectionQ.front();
if (conn->getAliveTime() >= m_maxIdleTime)
{
m_connectionQ.pop();
delete conn;
}
else
{
break;
}
}
}
}
void ConnectionPool::addConnection()
{
MysqlConn* conn = new MysqlConn;
conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
conn->refreshAliveTime();
m_connectionQ.push(conn);
}
shared_ptr<MysqlConn> ConnectionPool::getConnection()
{
unique_lock<mutex> locker(m_mutexQ);
while (m_connectionQ.empty())
{
if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout)))
{
if (m_connectionQ.empty())
{
//return nullptr;
continue;
}
}
}
shared_ptr<MysqlConn> connptr(m_connectionQ.front(), [this](MysqlConn* conn) {
lock_guard<mutex> locker(m_mutexQ);
conn->refreshAliveTime();
m_connectionQ.push(conn);
});//自定義shared_ptr的解構方法
m_connectionQ.pop();
m_cond.notify_all();
return connptr;
}
ConnectionPool::~ConnectionPool()
{
while (!m_connectionQ.empty())
{
MysqlConn* conn = m_connectionQ.front();
m_connectionQ.pop();
delete conn;
}
}
ConnectionPool::ConnectionPool()
{
// 載入組態檔
if (!parseJsonFile())
{
return;
}
for (int i = 0; i < m_minSize; ++i)
{
addConnection();
}
thread producer(&ConnectionPool::produceConnection, this);
thread recycler(&ConnectionPool::recycleConnection, this);
producer.detach();
recycler.detach();
}
#include <iostream>
#include <memory>
#include "MysqlConn.h"
#include "ConnectionPool.h"
using namespace std;
// 1. 單執行緒: 使用/不使用連線池
// 2. 多執行緒: 使用/不使用連線池
void op1(int begin, int end)
{
for (int i = begin; i < end; ++i)
{
MysqlConn conn;
conn.connect("root", "123159", "testdb", "192.168.237.131");
char sql[1024] = { 0 };
sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
conn.update(sql);
}
}
void op2(ConnectionPool* pool, int begin, int end)
{
for (int i = begin; i < end; ++i)
{
shared_ptr<MysqlConn> conn = pool->getConnection();
char sql[1024] = { 0 };
sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
conn->update(sql);
}
}
void test1()
{
#if 1
// 非連線池, 單執行緒, 用時: 21037278300 納秒, 21037 毫秒
steady_clock::time_point begin = steady_clock::now();
op1(0, 5000);
steady_clock::time_point end = steady_clock::now();
auto length = end - begin;
cout << "非連線池, 單執行緒, 用時: " << length.count() << " 納秒, "
<< length.count() / 1000000 << " 毫秒" << endl;
#else
// 連線池, 單執行緒, 用時: 8838406500 納秒, 8838 毫秒
ConnectionPool* pool = ConnectionPool::getConnectPool();
steady_clock::time_point begin = steady_clock::now();
op2(pool, 0, 5000);
steady_clock::time_point end = steady_clock::now();
auto length = end - begin;
cout << "連線池, 單執行緒, 用時: " << length.count() << " 納秒, "
<< length.count() / 1000000 << " 毫秒" << endl;
#endif
}
void test2()
{
#if 0
// 非連線池, 多單執行緒, 用時: 13277417000 納秒, 13277 毫秒
MysqlConn conn;
conn.connect("root", "root", "testdb", "192.168.237.131");
steady_clock::time_point begin = steady_clock::now();
thread t1(op1, 0, 1000);
thread t2(op1, 1000, 2000);
thread t3(op1, 2000, 3000);
thread t4(op1, 3000, 4000);
thread t5(op1, 4000, 5000);
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
steady_clock::time_point end = steady_clock::now();
auto length = end - begin;
cout << "非連線池, 多單執行緒, 用時: " << length.count() << " 納秒, "
<< length.count() / 1000000 << " 毫秒" << endl;
#else
// 連線池, 多單執行緒, 用時: 3938502100 納秒, 3938 毫秒
ConnectionPool* pool = ConnectionPool::getConnectPool();
steady_clock::time_point begin = steady_clock::now();
thread t1(op2, pool, 0, 1000);
thread t2(op2, pool, 1000, 2000);
thread t3(op2, pool, 2000, 3000);
thread t4(op2, pool, 3000, 4000);
thread t5(op2, pool, 4000, 5000);
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
steady_clock::time_point end = steady_clock::now();
auto length = end - begin;
cout << "連線池, 多單執行緒, 用時: " << length.count() << " 納秒, "
<< length.count() / 1000000 << " 毫秒" << endl;
#endif
}
int query()
{
MysqlConn conn;
bool tt = conn.connect("root", "123159", "testdb", "127.0.0.1");
cout << "tt: " << tt << endl;
string sql = "insert into person values(7, 25, 'man', 'tom')";
bool flag = conn.update(sql);
cout << "flag value: " << flag << endl;
sql = "select * from person";
conn.query(sql);
while (conn.next())
{
cout << conn.value(0) << ", "
<< conn.value(1) << ", "
<< conn.value(2) << ", "
<< conn.value(3) << endl;
}
return 0;
}
int main()
{
test2();
return 0;
}
需要設定jsoncpp和mysql
(1)設定jasoncpp
(2)設定mysql
參考:https://www.mysqlzh.com/doc/196/115.html
(1)基本函數
(2)解析Json格式資料
https://www.bilibili.com/video/BV1Fr4y1s7w4
https://subingwen.cn/cpp/dbconnectionPool/