Skip to content

Commit

Permalink
feat: block read/write (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 9, 2025
1 parent 8917f9f commit 5b404fc
Show file tree
Hide file tree
Showing 27 changed files with 844 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec;

import com.netease.nim.camellia.codec.Pack;
import com.netease.nim.camellia.codec.Unpack;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;

import java.util.HashMap;
import java.util.Map;

/**
* Created by caojiajun on 2025/1/9
*/
public record StringValue(byte[] key, byte[] value) {

public static byte[] encode(byte[] key, byte[] value) {
Pack pack = new Pack(key.length + value.length + 8);
pack.putVarbin(key);
pack.putVarbin(value);
pack.getBuffer().capacity(pack.getBuffer().readableBytes());
return pack.getBuffer().array();
}

public static StringValue decode(byte[] data) {
Unpack unpack = new Unpack(data);
byte[] key = unpack.popVarbin();
byte[] value = unpack.popVarbin();
return new StringValue(key, value);
}

public static Map<KeyInfo, byte[]> encodeMap(Map<KeyInfo, byte[]> map) {
Map<KeyInfo, byte[]> result = new HashMap<>();
for (Map.Entry<KeyInfo, byte[]> entry : map.entrySet()) {
result.put(entry.getKey(), StringValue.encode(entry.getKey().getKey(), entry.getValue()));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class StringValueCodec {

public static StringValueCodecResult encode(short slot, BlockType blockType, IValueManifest valueManifest, List<Pair<KeyInfo, byte[]>> values) throws IOException {
public static StringValueEncodeResult encode(short slot, BlockType blockType, IValueManifest valueManifest, List<Pair<KeyInfo, byte[]>> values) throws IOException {
//
//data to sub-block
List<SubBlock> subBlocks = new ArrayList<>();
Expand Down Expand Up @@ -89,21 +89,24 @@ public static StringValueCodecResult encode(short slot, BlockType blockType, IVa
int offset = 0;
short subBlockCount = 0;
buffer.putInt(0);//4
buffer.putInt(blockType.getBlockSize() - 10);//4
buffer.putShort(subBlockCount);//2
for (SubBlock block : subBlocks) {
//
if (buffer.remaining() < block.size()) {
//sub block merge
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length);
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 10, buffer.array().length);
buffer.putInt(0, crc);//4
buffer.putShort(4, subBlockCount);//2
buffer.putInt(4, buffer.remaining());
buffer.putShort(8, subBlockCount);//2
blockInfos.add(new BlockInfo(blockType, location, buffer.array()));
//
//new block
location = valueManifest.allocate(slot, blockType);
buffer = ByteBuffer.allocate(blockType.getBlockSize());
subBlockCount = 0;
buffer.putInt(0);//4
buffer.putInt(blockType.getBlockSize() - 10);
buffer.putShort(subBlockCount);//2
}
//add sub-block to block
Expand All @@ -121,24 +124,27 @@ public static StringValueCodecResult encode(short slot, BlockType blockType, IVa
subBlockCount++;
}
//sub block merge
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 6, buffer.array().length);
int crc = RedisClusterCRC16Utils.getCRC16(buffer.array(), 10, buffer.array().length);
buffer.putInt(0, crc);
buffer.putShort(4, subBlockCount);
buffer.putInt(4, buffer.remaining());
buffer.putShort(8, subBlockCount);
blockInfos.add(new BlockInfo(blockType, location, buffer.array()));
}
return new StringValueCodecResult(blockInfos, oldLocations);
return new StringValueEncodeResult(blockInfos, oldLocations);
}

public static List<byte[]> decode(byte[] data, BlockType blockType) {
public static StringValueDecodeResult decode(byte[] data, BlockType blockType) {
ByteBuffer buffer = ByteBuffer.wrap(data);
int crc1 = buffer.getInt();
int crc2 = RedisClusterCRC16Utils.getCRC16(data, 6, data.length);
if (crc1 != crc2) {
return new ArrayList<>();
return new StringValueDecodeResult(new ArrayList<>(), -1);
}

List<byte[]> values = new ArrayList<>();

int remaining = buffer.getInt();

short subBlockCount = buffer.getShort();
for (int i=0; i<subBlockCount; i++) {
CompressType compressType = CompressType.getByValue(buffer.get());
Expand All @@ -160,7 +166,7 @@ public static List<byte[]> decode(byte[] data, BlockType blockType) {
values.add(value);
}
}
return values;
return new StringValueDecodeResult(values, remaining);
}

private static class SubBlock {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec;

import java.util.List;

/**
* Created by caojiajun on 2025/1/9
*/
public record StringValueDecodeResult(List<byte[]> values, int remaining) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec;

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.BlockInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.ValueLocation;

import java.util.List;

/**
* Created by caojiajun on 2025/1/8
*/
public record StringValueEncodeResult(List<BlockInfo> blockInfos, List<ValueLocation> oldLocations) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.command;

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact.CompactExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.wal.WalGroup;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* Created by caojiajun on 2025/1/3
*/
public abstract class CommandOnLocalStorage {

protected WalGroup walGroup;

protected CompactExecutor compactExecutor;

protected KeyReadWrite keyReadWrite;
protected StringReadWrite stringReadWrite;

/**
* redis command of commander
* @return redis-command
*/
public abstract RedisCommand redisCommand();

/**
* check param
* @param command command
* @return success or fail
*/
protected abstract boolean parse(Command command);

/**
* execute command
* @param slot slot
* @param command command
* @return reply
*/
protected abstract Reply execute(short slot, Command command) throws Exception;

/**
* check and flush after write
* @param slot slot
* @throws IOException exception
*/
protected void afterWrite(short slot) throws IOException {
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
//key flush prepare
keyReadWrite.flushPrepare(slot);
//flush string value
CompletableFuture<FlushResult> future1 = stringReadWrite.flush(slot);
//flush key
future1.thenAccept(result -> keyReadWrite.flush(slot));
}
compactExecutor.compact(slot);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.command;


import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.util.MpscSlotHashExecutor;
import com.netease.nim.camellia.tools.utils.SysUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by caojiajun on 2025/1/3
*/
public class LocalStorageExecutors {

private static final Logger logger = LoggerFactory.getLogger(LocalStorageExecutors.class);

private static volatile LocalStorageExecutors INSTANCE;

private final MpscSlotHashExecutor commandExecutor;

private LocalStorageExecutors() {
int threads = ProxyDynamicConf.getInt("local.storage.command.executor.threads", SysUtils.getCpuNum() * 4);
int queueSize = ProxyDynamicConf.getInt("local.storage.command.executor.queue.size", 1024*128);
commandExecutor = new MpscSlotHashExecutor("local-storage-command-executor", threads, queueSize, new MpscSlotHashExecutor.AbortPolicy());
logger.info("LocalStorageExecutors init success, threads = {}, queueSize = {}", threads, queueSize);
}

public static LocalStorageExecutors getInstance() {
if (INSTANCE == null) {
synchronized (LocalStorageExecutors.class) {
if (INSTANCE == null) {
INSTANCE = new LocalStorageExecutors();
}
}
}
return INSTANCE;
}

public MpscSlotHashExecutor getCommandExecutor() {
return commandExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandOnLocalStorage;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;

Expand All @@ -15,7 +15,7 @@
* <p>
* Created by caojiajun on 2025/1/3
*/
public class Get extends CommandOnEmbeddedStorage {
public class Get extends CommandOnLocalStorage {

@Override
public RedisCommand redisCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandOnLocalStorage;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander;
Expand All @@ -18,7 +18,7 @@
* SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
* Created by caojiajun on 2025/1/3
*/
public class Set extends CommandOnEmbeddedStorage {
public class Set extends CommandOnLocalStorage {

private static final int nx = 1;
private static final int xx = 2;
Expand Down Expand Up @@ -120,7 +120,7 @@ protected Reply execute(short slot, Command command) throws Exception {
}
keyReadWrite.put(slot, keyInfo);

checkAndFlush(slot);
afterWrite(slot);

if (get) {
return new BulkReply(oldValue);
Expand Down
Loading

0 comments on commit 5b404fc

Please sign in to comment.