druid的keepalive機制源碼解析
DruidDataSource
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {
private int keepAliveCheckCount = 0;
private DruidConnectionHolder[] keepAliveConnections;
private volatile boolean keepAlive = false;
// from DruidAbstractDataSource
protected volatile long keepAliveBetweenTimeMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS * 2;
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 60 * 1000L;
public void init() throws SQLException {
//......
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true);
}
} else {
this.emptySignal();
}
}
//......
}
}DruidDataSource的init方法在keepAlive的時候觸發(fā)創(chuàng)建連接,當createScheduler不為null時(默認為null)執(zhí)行submitCreateTask,否則執(zhí)行emptySignal
submitCreateTask
com/alibaba/druid/pool/DruidDataSource.java
private void submitCreateTask(boolean initTask) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(initTask);
if (createTasks == null) {
createTasks = new long[8];
}
boolean putted = false;
for (int i = 0; i < createTasks.length; ++i) {
if (createTasks[i] == 0) {
createTasks[i] = task.taskId;
putted = true;
break;
}
}
if (!putted) {
long[] array = new long[createTasks.length * 3 / 2];
System.arraycopy(createTasks, 0, array, 0, createTasks.length);
array[createTasks.length] = task.taskId;
createTasks = array;
}
this.createSchedulerFuture = createScheduler.submit(task);
}submitCreateTask創(chuàng)建CreateConnectionTask,然后提交到createScheduler
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable {
private int errorCount;
private boolean initTask;
private final long taskId;
public CreateConnectionTask() {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
}
public CreateConnectionTask(boolean initTask) {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (; ; ) {
// addLast
lock.lock();
try {
if (closed || closing) {
clearCreateTask(taskId);
return;
}
boolean emptyWait = true;
if (createError != null && poolingCount == 0) {
emptyWait = false;
}
if (emptyWait) {
// 必須存在線程等待,才創(chuàng)建連接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive場景不能放棄創(chuàng)建
&& (!initTask) // 線程池初始化時的任務不能放棄創(chuàng)建
&& !isFailContinuous() // failContinuous時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程
&& !isOnFatalError() // onFatalError時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程
) {
clearCreateTask(taskId);
return;
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
clearCreateTask(taskId);
return;
}
}
} finally {
lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {
physicalConnection = createPhysicalConnection();
} catch (OutOfMemoryError e) {
LOG.error("create connection OutOfMemoryError, out memory. ", e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl, e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection Error", e);
// unknow fatal exception
setFailContinuous(true);
break;
} catch (Throwable e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection unexecpted error.", e);
break;
}
if (physicalConnection == null) {
continue;
}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {
JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}CreateConnectionTask主要是創(chuàng)建physicalConnection,然后放到connections中。在emptyWait為true的時候會根據(jù)條件執(zhí)行empty.await()
CreateConnectionThread
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name) {
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ; ) {
// addLast
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
boolean emptyWait = true;
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 必須存在線程等待,才創(chuàng)建連接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
if ((!closing) && (!closed)) {
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
}
break;
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0; // reset errorCount
if (closing || closed) {
break;
}
}
}
}CreateConnectionThread的邏輯與CreateConnectionTask有點類似,有不少重復的代碼,不像是同一個人寫的;CreateConnectionThread是在DruidDataSource的init方法中觸發(fā)createAndStartCreatorThread執(zhí)行的,看只執(zhí)行一次
shrink
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive)
shrink方法會根據(jù)poolingCount遍歷connections,在checkTime為true時會根據(jù)idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis),需要的話放入keepAliveConnections中,然后遍歷進行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數(shù)組
小結
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive);該方法處理了evict及keepalive的邏輯,根據(jù)poolingCount遍歷connections,在checkTime為true時會根據(jù)idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis),需要的話放入keepAliveConnections中,然后遍歷進行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數(shù)組。
jedis的keepalive是直接設置socket.setKeepAlive(true),而common-pools則沒有所謂的keepalive,本質上druid的keepalive與common-pools的testWhileIdle類似;只不過druid直接在getConnection的時候執(zhí)行testWhileIdle,這個邏輯有點奇怪,如果移除掉,而在shrink方法里頭的keepAlive邏輯刪除keepAliveBetweenTimeMillis判斷,那么就跟common-pools的testWhileIdle的邏輯一致了。druid的keepalive相當于帶了keepAliveBetweenTimeMillis的testWhileIdle。
以上就是druid的keepalive機制源碼解析的詳細內(nèi)容,更多關于druid keepalive機制的資料請關注腳本之家其它相關文章!
相關文章
idea新建mapper.xml文件詳細步驟如:mybatis-config
這篇文章主要介紹了idea新建xml模板設置,例如:mybatis-config,本文分步驟通過圖文并茂的形式給大家介紹的非常詳細,需要的朋友可以參考下2023-07-07
淺析Java常用API(Scanner,Random)匿名對象
這篇文章主要介紹了Java常用API(Scanner,Random)匿名對象,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03
詳談Java編程之委托代理回調(diào)、內(nèi)部類以及匿名內(nèi)部類回調(diào)(閉包回調(diào))
下面小編就為大家?guī)硪黄斦凧ava編程之委托代理回調(diào)、內(nèi)部類以及匿名內(nèi)部類回調(diào)(閉包回調(diào))。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05
Java中實現(xiàn)將jar包內(nèi)文件資源釋放出來
這篇文章主要介紹了Java中實現(xiàn)將jar包內(nèi)文件資源釋放出來的方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08

