Skip to content

Commit

Permalink
feat: 调度支持k8s job bkdevops-projects#203
Browse files Browse the repository at this point in the history
  • Loading branch information
felixncheng committed Jul 16, 2024
1 parent e2f8a1a commit c56babe
Show file tree
Hide file tree
Showing 28 changed files with 902 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.tencent.devops.common.time

import java.time.LocalDateTime
import java.time.ZoneId

fun LocalDateTime.toEpochMilli(zoneId: ZoneId = ZoneId.systemDefault()): Long {
return this.atZone(zoneId).toInstant().toEpochMilli()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package com.tencent.devops.schedule.enums
*/
enum class BlockStrategyEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
) : DictItem {

DEFAULT(1, "默认策略");
SERIAL_EXECUTION(1, "串行"),
DISCARD_LATER(2, "丢弃最后"),
COVER_EARLY(3, "覆盖之前"),
;

override fun code() = code
override fun description() = label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,32 @@ package com.tencent.devops.schedule.enums
*/
enum class JobModeEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
val isContainer: Boolean,
val isScript: Boolean,
) : DictItem {
/**
* Java Bean
*/
BEAN(1, "Java Bean"),
BEAN(1, "Java Bean", false, false),

/**
* Shell
*/
SHELL(2, "Shell");
SHELL(2, "Shell", false, true),

/**
* K8s shell
* */
K8S_SHELL(3, "K8s shell", true, true),
;

override fun code() = code
override fun description() = label

companion object {
const val DEFAULT_IMAGE = "bash"

/**
* 根据[code]查找对应的枚举类型
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package com.tencent.devops.schedule.enums
*/
enum class ScheduleTypeEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
) : DictItem {
/**
* 立即执行
* 立即执行,调度器不会进行调度,需要主动触发才会执行
*/
IMMEDIATELY(1, "立即执行"),

Expand All @@ -25,7 +25,8 @@ enum class ScheduleTypeEnum(
/**
* cron表达式
*/
CRON(4, "Cron表达式");
CRON(4, "Cron表达式"),
;

override fun code() = code
override fun description() = label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,13 @@ data class JobCreateRequest(
/**
* 最大重试次数
*/
val maxRetryCount: Int
val maxRetryCount: Int,
/**
* 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码
* */
val source: String? = null,
/**
* 镜像地址,容器任务需要
* */
val image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,12 @@ data class JobInfo(
* 下次执行时间
*/
var nextTriggerTime: Long = 0,
/**
* 资源内容,可以是脚本,也可以是yaml
* */
var source: String? = null,
/**
* 镜像地址
* */
var image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ data class JobUpdateRequest(
* 最大重试次数
*/
val maxRetryCount: Int? = null,
/**
* 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码
* */
val source: String? = null,
/**
* 镜像地址,容器任务需要
* */
val image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import java.time.LocalDateTime

data class TriggerParam(
var jobId: String,
var jobMode: Int,
var source: String? = null,
var image: String? = null,
var jobHandler: String,
var jobParam: String,
var blockStrategy: Int,
Expand All @@ -15,5 +18,6 @@ data class TriggerParam(
var triggerTime: LocalDateTime,
var broadcastIndex: Int = 0,
var broadcastTotal: Int = 0,
var workerAddress: String? = null
var workerAddress: String? = null,
var updateTime: LocalDateTime,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ fun TJobInfo.convert(): JobInfo {
maxRetryCount = maxRetryCount,
triggerStatus = triggerStatus,
lastTriggerTime = lastTriggerTime,
nextTriggerTime = nextTriggerTime
nextTriggerTime = nextTriggerTime,
source = source,
image = image,
)
}

Expand All @@ -49,7 +51,9 @@ fun JobInfo.convert(): TJobInfo {
maxRetryCount = maxRetryCount,
triggerStatus = triggerStatus,
lastTriggerTime = lastTriggerTime,
nextTriggerTime = nextTriggerTime
nextTriggerTime = nextTriggerTime,
source = source,
image = image,
)
}

Expand All @@ -59,7 +63,7 @@ fun TWorkerGroup.convert(): WorkerGroup {
name = name,
discoveryType = discoveryType,
updateTime = updateTime,
registryList = addressList.split(",")
registryList = addressList.split(","),
)
}

Expand All @@ -69,7 +73,7 @@ fun WorkerGroup.convert(): TWorkerGroup {
name = name,
discoveryType = discoveryType,
updateTime = updateTime,
addressList = registryList.joinToString(",")
addressList = registryList.joinToString(","),
)
}

Expand All @@ -90,7 +94,7 @@ fun TJobLog.convert(): JobLog {
executionTime = executionTime,
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus
alarmStatus = alarmStatus,
)
}

Expand All @@ -111,7 +115,7 @@ fun JobLog.convert(): TJobLog {
executionTime = executionTime,
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus
alarmStatus = alarmStatus,
)
}

Expand All @@ -120,7 +124,7 @@ fun TWorker.convert(): WorkerInfo {
id = id.orEmpty(),
address = address,
group = group,
updateTime = updateTime
updateTime = updateTime,
)
}

Expand All @@ -129,6 +133,6 @@ fun WorkerInfo.convert(): TWorker {
id = id.orEmpty(),
address = address,
group = group,
updateTime = updateTime
updateTime = updateTime,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,12 @@ data class TJobInfo(
* 下次执行时间
*/
var nextTriggerTime: Long,
/**
* 资源内容,可以是脚本,也可以是yaml
* */
var source: String? = null,
/**
* 镜像地址
* */
var image: String? = null,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.devops.schedule.mongo.provider

import com.mongodb.MongoServerException
import com.mongodb.client.result.UpdateResult
import com.tencent.devops.schedule.mongo.model.TLockInfo
import com.tencent.devops.schedule.provider.LockProvider
Expand All @@ -17,7 +18,7 @@ import java.util.UUID
* 基于mongodb实现的lock
*/
class MongoLockProvider(
private val mongoTemplate: MongoTemplate
private val mongoTemplate: MongoTemplate,
) : LockProvider {
override fun acquire(key: String, expiration: Long): String? {
val query = Query.query(where(TLockInfo::id).isEqualTo(key))
Expand All @@ -29,38 +30,49 @@ class MongoLockProvider(

val options = FindAndModifyOptions().upsert(true)
.returnNew(true)
val lock = mongoTemplate.findAndModify(
query, update, options,
TLockInfo::class.java
)!!
val locked = lock.token == token
try {
val lock = mongoTemplate.findAndModify(
query,
update,
options,
TLockInfo::class.java,
)!!
val locked = lock.token == token

// 如果已过期
if (!locked && lock.expireAt < System.currentTimeMillis()) {
val deleted = mongoTemplate.remove(
Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(lock.token)
.and(TLockInfo::expireAt).`is`(lock.expireAt)
),
TLockInfo::class.java
)
if (deleted.deletedCount >= 1) {
// 成功释放锁, 再次尝试获取锁
return acquire(key, expiration)
// 如果已过期
if (!locked && lock.expireAt < System.currentTimeMillis()) {
val deleted = mongoTemplate.remove(
Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(lock.token)
.and(TLockInfo::expireAt).`is`(lock.expireAt),
),
TLockInfo::class.java,
)
if (deleted.deletedCount >= 1) {
// 成功释放锁, 再次尝试获取锁
return acquire(key, expiration)
}
}
return if (locked) {
logger.trace("Acquired lock for key {} with token {}", key, token)
return token
} else {
null
}
} catch (e: MongoServerException) {
if (e.code == 11000) { // duplicate key
return null
} else {
throw e
}
}

return if (locked) {
logger.trace("Acquired lock for key {} with token {}", key, token)
return token
} else null
}

override fun release(key: String, token: String): Boolean {
val query = Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(token)
.and(TLockInfo::token).isEqualTo(token),
)
val deleted = mongoTemplate.remove(query, TLockInfo::class.java)
val released = deleted.deletedCount == 1L
Expand All @@ -78,7 +90,7 @@ class MongoLockProvider(
override fun refresh(key: String, token: String, expiration: Long): Boolean {
val query = Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(token)
.and(TLockInfo::token).isEqualTo(token),
)
val update = Update.update(TLockInfo::expireAt.name, System.currentTimeMillis() + expiration)
val updated: UpdateResult = mongoTemplate.updateFirst(query, update, TLockInfo::class.java)
Expand All @@ -90,16 +102,16 @@ class MongoLockProvider(
} else {
logger.warn(
"Refresh query did not affect any records for key {} with token {}. " +
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key, token
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key,
token,
)
}

return refreshed
}


companion object {
private val logger = LoggerFactory.getLogger(MongoLockProvider::class.java)

Expand Down
Loading

0 comments on commit c56babe

Please sign in to comment.