Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #9184] Optimize QueueLockManager#tryLock method in Pop #9185

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
Expand Down Expand Up @@ -150,11 +151,11 @@ public static String genAckUniqueId(AckMsg ackMsg) {

public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
return batchAckMsg.getTopic()
+ PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+ PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+ PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+ PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+ PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
+ PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+ PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+ PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+ PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+ PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
}

public static String genCkUniqueId(PopCheckPoint ck) {
Expand Down Expand Up @@ -861,7 +862,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,

private boolean isPopShouldStop(String topic, String group, int queueId) {
return brokerController.getBrokerConfig().isEnablePopMessageThreshold() &&
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold();
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold();
}

private long getPopOffset(String topic, String group, int queueId, int initMode, boolean init, String lockKey,
Expand Down Expand Up @@ -908,7 +909,7 @@ private long getInitOffset(String topic, String group, int queueId, int initMode
}
if (init) { // whichever initMode
this.brokerController.getConsumerOffsetManager().commitOffset(
"getPopOffset", group, topic, queueId, offset);
"getPopOffset", group, topic, queueId, offset);
}
return offset;
}
Expand Down Expand Up @@ -1002,12 +1003,13 @@ static class TimedLock {
private volatile long lockTime;

public TimedLock() {
this.lock = new AtomicBoolean(true);
// init lock status, false means not locked
this.lock = new AtomicBoolean(false);
this.lockTime = System.currentTimeMillis();
}

public boolean tryLock() {
boolean ret = lock.compareAndSet(true, false);
boolean ret = lock.compareAndSet(false, true);
if (ret) {
this.lockTime = System.currentTimeMillis();
return true;
Expand All @@ -1017,11 +1019,11 @@ public boolean tryLock() {
}

public void unLock() {
lock.set(true);
lock.set(false);
}

public boolean isLock() {
return !lock.get();
return lock.get();
}

public long getLockTime() {
Expand All @@ -1041,21 +1043,7 @@ public boolean tryLock(String topic, String consumerGroup, int queueId) {
}

public boolean tryLock(String key) {
TimedLock timedLock = expiredLocalCache.get(key);

if (timedLock == null) {
TimedLock old = expiredLocalCache.putIfAbsent(key, new TimedLock());
if (old != null) {
return false;
} else {
timedLock = expiredLocalCache.get(key);
}
}

if (timedLock == null) {
return false;
}

TimedLock timedLock = ConcurrentHashMapUtils.computeIfAbsent(expiredLocalCache, key, k -> new TimedLock());
return timedLock.tryLock();
}

Expand Down
Loading