PowerJob的AliOssService工作流程源碼解讀
序
本文主要研究一下PowerJob的AliOssService
DFsService
tech/powerjob/server/extension/dfs/DFsService.java
public interface DFsService {
/**
* 存儲文件
* @param storeRequest 存儲請求
* @throws IOException 異常
*/
void store(StoreRequest storeRequest) throws IOException;
/**
* 下載文件
* @param downloadRequest 文件下載請求
* @throws IOException 異常
*/
void download(DownloadRequest downloadRequest) throws IOException;
/**
* 獲取文件元信息
* @param fileLocation 文件位置
* @return 存在則返回文件元信息
* @throws IOException 異常
*/
Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;
/**
* 清理 powerjob 認為“過期”的文件
* 部分存儲系統(tǒng)自帶生命周期管理(如阿里云OSS,則不需要單獨實現(xiàn)該方法)
* @param bucket bucket
* @param days 天數(shù),需要清理超過 X 天的文件
*/
default void cleanExpiredFiles(String bucket, int days) {
}
}DFsService接口定義了store、download、fetchFileMeta、cleanExpiredFiles方法
AbstractDFsService
tech/powerjob/server/persistence/storage/AbstractDFsService.java
@Slf4j
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
protected ApplicationContext applicationContext;
public AbstractDFsService() {
log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
}
abstract protected void init(ApplicationContext applicationContext);
protected static final String PROPERTY_KEY = "oms.storage.dfs";
protected static String fetchProperty(Environment environment, String dfsType, String key) {
String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
return environment.getProperty(pKey);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
init(applicationContext);
}
}AbstractDFsService聲明實現(xiàn)DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法執(zhí)行了init
AliOssService
tech/powerjob/server/persistence/storage/impl/AliOssService.java
@Slf4j
@Priority(value = Integer.MAX_VALUE - 1)
@Conditional(AliOssService.AliOssCondition.class)
public class AliOssService extends AbstractDFsService {
private static final String TYPE_ALI_OSS = "alioss";
private static final String KEY_ENDPOINT = "endpoint";
private static final String KEY_BUCKET = "bucket";
private static final String KEY_CREDENTIAL_TYPE = "credential_type";
private static final String KEY_AK = "ak";
private static final String KEY_SK = "sk";
private static final String KEY_TOKEN = "token";
private OSS oss;
private String bucket;
private static final int DOWNLOAD_PART_SIZE = 10240;
private static final String NO_SUCH_KEY = "NoSuchKey";
//......
void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
if (StringUtils.isEmpty(bucket)) {
throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
}
this.bucket = bucket;
CredentialsProvider credentialsProvider;
CredentialType credentialType = CredentialType.parse(mode);
switch (credentialType) {
case PWD:
credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
break;
case SYSTEM_PROPERTY:
credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
break;
default:
credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
}
this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");
}
//......
}AliOssService繼承了AbstractDFsService
store
@Override
public void store(StoreRequest storeRequest) throws IOException {
ObjectMetadata objectMetadata = new ObjectMetadata();
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
oss.putObject(putObjectRequest);
}store方法創(chuàng)建PutObjectRequest,使用oss.putObject進行上傳
download
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
FileLocation dfl = downloadRequest.getFileLocation();
DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
try {
FileUtils.forceMkdirParent(downloadRequest.getTarget());
oss.downloadFile(downloadFileRequest);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}download方法則根據(jù)DownloadRequest指定的FileLocation創(chuàng)建DownloadFileRequest,然后通過oss.downloadFile(downloadFileRequest)進行下載
fetchFileMeta
@Override
public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
try {
ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
return Optional.ofNullable(objectMetadata).map(ossM -> {
Map<String, Object> metaInfo = Maps.newHashMap();
metaInfo.putAll(ossM.getRawMetadata());
if (ossM.getUserMetadata() != null) {
metaInfo.putAll(ossM.getUserMetadata());
}
return new FileMeta()
.setLastModifiedTime(ossM.getLastModified())
.setLength(ossM.getContentLength())
.setMetaInfo(metaInfo);
});
} catch (OSSException oe) {
String errorCode = oe.getErrorCode();
if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
return Optional.empty();
}
ExceptionUtils.rethrow(oe);
}
return Optional.empty();
}fetchFileMeta方法通過oss.getObjectMetadata獲取ObjectMetadata
cleanExpiredFiles
@Override
public void cleanExpiredFiles(String bucket, int days) {
/*
阿里云 OSS 自帶生命周期管理,請參考文檔進行配置,代碼層面不進行實現(xiàn)(浪費服務(wù)器資源)https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自帶生命周期管理,請參考文檔進行配置,代碼層面不進行實現(xiàn)(浪費服務(wù)器資源)https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自帶生命周期管理,請參考文檔進行配置,代碼層面不進行實現(xiàn)(浪費服務(wù)器資源)https://help.aliyun.com/zh/oss/user-guide/overview-54
*/
}cleanExpiredFiles則是空操作
init
protected void init(ApplicationContext applicationContext) {
Environment environment = applicationContext.getEnvironment();
String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
try {
initOssClient(endpoint, bkt, ct, ak, sk, token);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}init則是通過environment獲取相關(guān)屬性,然后執(zhí)行initOssClient
小結(jié)
DFsService接口定義了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService聲明實現(xiàn)DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法執(zhí)行了init;AliOssService繼承了AbstractDFsService,通過ossClient實現(xiàn)了store、download、fetchFileMeta方法。
以上就是PowerJob的AliOssServiceg工作流程源碼解讀的詳細內(nèi)容,更多關(guān)于PowerJob AliOssServiceg的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA?2020.3最新永久激活碼(免費激活到?2099?年,親測有效)
分享一下?IntelliJ?IDEA?2020.3.1?最新激活注冊碼,破解教程如下,可免費激活至?2099?年,親測有效,本文給大家分享兩種方法,感興趣的朋友參考下吧2021-01-01
RestTemplate發(fā)送請求時Cookie的影響及注意事項說明
這篇文章主要介紹了RestTemplate發(fā)送請求時Cookie的影響及注意事項說明,具有很好的參考價值,希望對大家有所幫助。2023-07-07
Kafka單節(jié)點偽分布式集群搭建實現(xiàn)過程詳解
這篇文章主要介紹了Kafka單節(jié)點偽分布式集群搭建實現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-11-11
SpringBoot實現(xiàn)RAS+AES自動接口解密
本文主要介紹了SpringBoot實現(xiàn)RAS+AES自動接口解密,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-03-03
JetBrains IntelliJ IDEA 配置優(yōu)化技巧
這篇文章主要介紹了JetBrains IntelliJ IDEA 配置優(yōu)化技巧,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12

