java實(shí)現(xiàn)memcache服務(wù)器的示例代碼
什么是Memcache?
Memcache集群環(huán)境下緩存解決方案
Memcache是一個(gè)高性能的分布式的內(nèi)存對象緩存系統(tǒng),通過在內(nèi)存里維護(hù)一個(gè)統(tǒng)一的巨大的hash表,它能夠用來存儲各種格式的數(shù)據(jù),包括圖像、視頻、文件以及數(shù)據(jù)庫檢索的結(jié)果等。簡單的說就是將數(shù)據(jù)調(diào)用到內(nèi)存中,然后從內(nèi)存中讀取,從而大大提高讀取速度?! ?br />
Memcache是danga的一個(gè)項(xiàng)目,最早是LiveJournal 服務(wù)的,最初為了加速 LiveJournal 訪問速度而開發(fā)的,后來被很多大型的網(wǎng)站采用?! ?br />
Memcached是以守護(hù)程序方式運(yùn)行于一個(gè)或多個(gè)服務(wù)器中,隨時(shí)會接收客戶端的連接和操作
為什么會有Memcache和memcached兩種名稱?
其實(shí)Memcache是這個(gè)項(xiàng)目的名稱,而memcached是它服務(wù)器端的主程序文件名,知道我的意思了吧。一個(gè)是項(xiàng)目名稱,一個(gè)是主程序文件名,在網(wǎng)上看到了很多人不明白,于是混用了。
Memcached是高性能的,分布式的內(nèi)存對象緩存系統(tǒng),用于在動態(tài)應(yīng)用中減少數(shù)據(jù)庫負(fù)載,提升訪問速度。Memcached由Danga Interactive開發(fā),用于提升LiveJournal.com訪問速度的。LJ每秒動態(tài)頁面訪問量幾千次,用戶700萬。Memcached將數(shù)據(jù)庫負(fù)載大幅度降低,更好的分配資源,更快速訪問。
這篇文章將會涉及以下內(nèi)容:
- Java Socket多線程服務(wù)器
- Java IO
- Concurrency
- Memcache特性和協(xié)議
Memcache
Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.
即內(nèi)存緩存數(shù)據(jù)庫,是一個(gè)鍵值對數(shù)據(jù)庫。該數(shù)據(jù)庫的存在是為了將從其他服務(wù)中獲取的數(shù)據(jù)暫存在內(nèi)存中,在重復(fù)訪問時(shí)可以直接從命中的緩存中返回。既加快了訪問速率,也減少了其他服務(wù)的負(fù)載。這里將實(shí)現(xiàn)一個(gè)單服務(wù)器版本的Memcache,并且支持多個(gè)客戶端的同時(shí)連接。
客戶端將與服務(wù)器建立telnet連接,然后按照Memcache協(xié)議與服務(wù)器緩存進(jìn)行交互。這里實(shí)現(xiàn)的指令為get,set和del。先來看一下各個(gè)指令的格式
set
set屬于存儲指令,存儲指令的特點(diǎn)時(shí),第一行輸入基本信息,第二行輸入其對應(yīng)的value值。
set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n
如果存儲成功,將會返回STORED,如果指令中包含noreply屬性,則服務(wù)器將不會返回信息。
該指令中每個(gè)域的內(nèi)容如下:
- key: 鍵
- flags: 16位無符號整數(shù),會在get時(shí)隨鍵值對返回
- exptime: 過期時(shí)間,以秒為單位
- bytes:即將發(fā)送的value的長度
- noreply:是否需要服務(wù)器響應(yīng),為可選屬性
如果指令不符合標(biāo)準(zhǔn),服務(wù)器將會返回ERROR。
get
get屬于獲取指令,該指令特點(diǎn)如下:
get <key>*\r\n
它支持傳入多個(gè)key的值,如果緩存命中了一個(gè)或者多個(gè)key,則會返回相應(yīng)的數(shù)據(jù),并以END作為結(jié)尾。如果沒有命中,則返回的消息中不包含該key對應(yīng)的值。格式如下:
VALUE <key> <flags> <bytes>\r\n <data block>\r\n VALUE <key> <flags> <bytes>\r\n <data block>\r\n END del
刪除指令,該指令格式如下:
del <key> [noreply]\r\n
如果刪除成功,則返回DELETED\r\n,否則返回NOT_FOUND。如果有noreply參數(shù),則服務(wù)器不會返回響應(yīng)。
JAVA SOCKET
JAVA SOCKET需要了解的只是包括TCP協(xié)議,套接字,以及IO流。這里就不詳細(xì)贅述,可以參考我的這系列文章,也建議去閱讀JAVA Network Programming。一書。
代碼實(shí)現(xiàn)
這里貼圖功能出了點(diǎn)問題,可以去文末我的項(xiàng)目地址查看類圖。
這里采用了指令模式和工廠模式實(shí)現(xiàn)指令的創(chuàng)建和執(zhí)行的解耦。指令工廠將會接收commandLine并且返回一個(gè)Command實(shí)例。每一個(gè)Command都擁有execute方法用來執(zhí)行各自獨(dú)特的操作。這里只貼上del指令的特殊實(shí)現(xiàn)。
/**
* 各種指令
* 目前支持get,set,delete
*
* 以及自定義的
* error,end
*/
public interface Command {
/**
* 執(zhí)行指令
* @param reader
* @param writer
*/
void execute(Reader reader, Writer writer);
/**
* 獲取指令的類型
* @return
*/
CommandType getType();
}
/**
* 指令工廠 單一實(shí)例
*/
public class CommandFactory {
private static CommandFactory commandFactory;
private static Cache<Item> memcache;
private CommandFactory(){}
public static CommandFactory getInstance(Cache<Item> cache) {
if (commandFactory == null) {
commandFactory = new CommandFactory();
memcache = cache;
}
return commandFactory;
}
/**
* 根據(jù)指令的類型獲取Command
* @param commandLine
* @return
*/
public Command getCommand(String commandLine){
if (commandLine.matches("^set .*$")){
return new SetCommand(commandLine, memcache);
}else if (commandLine.matches("^get .*$")){
return new GetCommand(commandLine, memcache);
}else if (commandLine.matches("^del .*$")){
return new DeleteCommand(commandLine, memcache);
}else if (commandLine.matches("^end$")){
return new EndCommand(commandLine);
}else{
return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);
}
}
}
/**
* 刪除緩存指令
*/
public class DeleteCommand implements Command{
private final String command;
private final Cache<Item> cache;
private String key;
private boolean noReply;
public DeleteCommand(final String command, final Cache<Item> cache){
this.command = command;
this.cache = cache;
initCommand();
}
private void initCommand(){
if (this.command.contains("noreply")){
noReply = true;
}
String[] info = command.split(" ");
key = info[1];
}
@Override
public void execute(Reader reader, Writer writer) {
BufferedWriter bfw = (BufferedWriter) writer;
Item item = cache.delete(key);
if (!noReply){
try {
if (item == null){
bfw.write("NOT_FOUND\r\n");
}else {
bfw.write("DELETED\r\n");
}
bfw.flush();
} catch (IOException e) {
try {
bfw.write("ERROR\r\n");
bfw.flush();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
}
@Override
public CommandType getType() {
return CommandType.SEARCH;
}
}
然后是實(shí)現(xiàn)內(nèi)存服務(wù)器,為了支持先進(jìn)先出功能,這里使用了LinkedTreeMap作為底層實(shí)現(xiàn),并且重寫了removeOldest方法。同時(shí)還使用CacheManager的后臺線程及時(shí)清除過期的緩存條目。
public class Memcache implements Cache<Item>{
private Logger logger = Logger.getLogger(Memcache.class.getName());
//利用LinkedHashMap實(shí)現(xiàn)LRU
private static LinkedHashMap<String, Item> cache;
private final int maxSize;
//負(fù)載因子
private final float DEFAULT_LOAD_FACTOR = 0.75f;
public Memcache(final int maxSize){
this.maxSize = maxSize;
//確保cache不會在達(dá)到maxSize之后自動擴(kuò)容
int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;
this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){
@Override
protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) {
if (size() > maxSize){
logger.info("緩存數(shù)量已經(jīng)達(dá)到上限,會刪除最近最少使用的條目");
}
return size() > maxSize;
}
};
//實(shí)現(xiàn)同步訪問
Collections.synchronizedMap(cache);
}
public synchronized boolean isFull(){
return cache.size() >= maxSize;
}
@Override
public Item get(String key) {
Item item = cache.get(key);
if (item == null){
logger.info("緩存中key:" + key + "不存在");
return null;
}else if(item!=null && item.isExpired()){ //如果緩存過期則刪除并返回null
logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + "已經(jīng)失效");
cache.remove(key);
return null;
}
logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + " 剩余有效時(shí)間" + item.remainTime());
return item;
}
@Override
public void set(String key, Item value) {
logger.info("向緩存中寫入key:" + key + " value:" + value);
cache.put(key, value);
}
@Override
public Item delete(String key) {
logger.info("從緩存中刪除key:" + key);
return cache.remove(key);
}
@Override
public int size(){
return cache.size();
}
@Override
public int capacity() {
return maxSize;
}
@Override
public Iterator<Map.Entry<String, Item>> iterator() {
return cache.entrySet().iterator();
}
}
/**
* 緩存管理器
* 后臺線程
* 將cache中過期的緩存刪除
*/
public class CacheManager implements Runnable {
private Logger logger = Logger.getLogger(CacheManager.class.getName());
//緩存
public Cache<Item> cache;
public CacheManager(Cache<Item> cache){
this.cache = cache;
}
@Override
public void run() {
while (true){
Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator();
while (itemIterator.hasNext()){
Map.Entry<String, Item> entry = itemIterator.next();
Item item = entry.getValue();
if(item.isExpired()){
logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已經(jīng)過期,從數(shù)據(jù)庫中刪除");
itemIterator.remove();
}
}
try {
//每隔5秒鐘再運(yùn)行該后臺程序
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
最后是實(shí)現(xiàn)一個(gè)多線程的Socket服務(wù)器,這里就是將ServerSocket綁定到一個(gè)接口,并且將accept到的Socket交給額外的線程處理。
/**
* 服務(wù)器
*/
public class IOServer implements Server {
private boolean stop;
//端口號
private final int port;
//服務(wù)器線程
private ServerSocket serverSocket;
private final Logger logger = Logger.getLogger(IOServer.class.getName());
//線程池,線程容量為maxConnection
private final ExecutorService executorService;
private final Cache<Item> cache;
public IOServer(int port, int maxConnection, Cache<Item> cache){
if (maxConnection<=0) throw new IllegalArgumentException("支持的最大連接數(shù)量必須為正整數(shù)");
this.port = port;
executorService = Executors.newFixedThreadPool(maxConnection);
this.cache = cache;
}
@Override
public void start() {
try {
serverSocket = new ServerSocket(port);
logger.info("服務(wù)器在端口"+port+"上啟動");
while (true){
try {
Socket socket = serverSocket.accept();
logger.info("收到"+socket.getLocalAddress()+"的連接");
executorService.submit(new SocketHandler(socket, cache));
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
logger.log(Level.WARNING, "服務(wù)器即將關(guān)閉...");
e.printStackTrace();
} finally {
executorService.shutdown();
shutDown();
}
}
/**
* 服務(wù)器是否仍在運(yùn)行
* @return
*/
public boolean isRunning() {
return !serverSocket.isClosed();
}
/**
* 停止服務(wù)器
*/
public void shutDown(){
try {
if (serverSocket!=null){
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 處理各個(gè)客戶端的連接
* 在獲得end指令后關(guān)閉連接s
*/
public class SocketHandler implements Runnable{
private static Logger logger = Logger.getLogger(SocketHandler.class.getName());
private final Socket socket;
private final Cache<Item> cache;
private boolean finish;
public SocketHandler(Socket s, Cache<Item> cache){
this.socket = s;
this.cache = cache;
}
@Override
public void run() {
try {
//獲取socket輸入流
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//獲取socket輸出流
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
CommandFactory commandFactory = CommandFactory.getInstance(cache);
while (!finish){
final String commandLine = reader.readLine();
logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);
if (commandLine == null || commandLine.trim().isEmpty()) {
continue;
}
//使用指令工廠獲取指令實(shí)例
final Command command = commandFactory.getCommand(commandLine);
command.execute(reader, writer);
if (command.getType() == CommandType.END){
logger.info("請求關(guān)閉連接");
finish = true;
}
}
} catch (IOException e) {
e.printStackTrace();
logger.info("關(guān)閉來自" + socket.getLocalAddress() + "的連接");
} finally {
try {
if (socket != null){
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
項(xiàng)目地址請戳這里,如果覺得還不錯(cuò)的話,希望能給個(gè)星哈><
參考資料
memcached官網(wǎng)
memcache協(xié)議
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Java中常見的5種WEB服務(wù)器介紹
- java網(wǎng)絡(luò)編程之socket網(wǎng)絡(luò)編程示例(服務(wù)器端/客戶端)
- Java獲取此次請求URL以及服務(wù)器根路徑的方法
- JAVA技術(shù)實(shí)現(xiàn)上傳下載文件到FTP服務(wù)器(完整)
- Java實(shí)現(xiàn)圖片上傳到服務(wù)器并把上傳的圖片讀取出來
- java實(shí)現(xiàn)服務(wù)器文件打包zip并下載的示例(邊打包邊下載)
- Java實(shí)現(xiàn)FTP服務(wù)器功能實(shí)例代碼
- Java如何從服務(wù)器中下載圖片
- java獲取服務(wù)器基本信息的方法
相關(guān)文章
Java中ScheduledExecutorService的使用方法詳解
ScheduledExecutorService是ExecutorService的一個(gè)子接口,它主要用于在給定的延遲之后或周期性地執(zhí)行任務(wù),本文主要介紹了ScheduledExecutorService的使用方法,感興趣的可以了解下2024-12-12
EntityWrapper如何在and條件中嵌套o(hù)r語句
這篇文章主要介紹了EntityWrapper如何在and條件中嵌套o(hù)r語句,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
很多場景下我們都需要延遲隊(duì)列。這篇文章主要以RabbitMQ為例來和大家聊一聊延遲隊(duì)列的玩法。文中的代碼具有一定的學(xué)習(xí)價(jià)值,感興趣的同學(xué)可以了解一下2021-12-12
java實(shí)現(xiàn)一個(gè)掃描包的工具類實(shí)例代碼
很多框架,比如springmvc,mybatis等使用注解,為了處理注解,必然要對包進(jìn)行掃描,所以下面這篇文章主要給大家分享介紹了關(guān)于利用java如何實(shí)現(xiàn)一個(gè)掃描包的工具類,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。2017-10-10
Spring Boot 中的 @ConditionalOnBean 注解場景分析
本文詳細(xì)介紹了Spring Boot中的@ConditionalOnBean注解的使用場景、原理和基本用法,通過多個(gè)示例,展示了如何使用該注解根據(jù)Bean是否存在來動態(tài)地注冊或跳過特定的Bean,感興趣的朋友一起看看吧2025-03-03
SpringBoot使用Sharding-JDBC實(shí)現(xiàn)數(shù)據(jù)分片和讀寫分離的方法
本文主要介紹了SpringBoot使用Sharding-JDBC實(shí)現(xiàn)數(shù)據(jù)分片和讀寫分離,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10

