Skip to content

Commit

Permalink
fix(controller): fix listStreamObjects interface (#825)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 12, 2023
1 parent f1bc8ca commit 8870c10
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.rocketmq.metadata.service.cache;

import com.automq.rocketmq.metadata.dao.S3StreamObject;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -127,10 +126,12 @@ public List<S3StreamObject> listStreamObjects(long streamId, long startOffset, l

lock.readLock().lock();
try {
List<S3StreamObject> reversed = Lists.reverse(list);
return reversed.stream()
.filter(s3StreamObject -> s3StreamObject.getEndOffset() >= startOffset
&& s3StreamObject.getStartOffset() <= endOffset)
return list.stream()
.filter(s3StreamObject -> {
long start = s3StreamObject.getStartOffset();
long end = s3StreamObject.getEndOffset();
return start >= startOffset && (end <= endOffset || endOffset == -1);
})
.limit(limit)
.toList();
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.metadata.service.cache;

import com.automq.rocketmq.metadata.dao.S3StreamObject;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class S3StreamObjectCacheTest {

@Test
void listStreamObjects() {
S3StreamObjectCache cache = new S3StreamObjectCache();

List<S3StreamObject> list = new ArrayList<>();

S3StreamObject streamObject = new S3StreamObject();
streamObject.setStartOffset(0L);
streamObject.setEndOffset(100L);
list.add(streamObject);

streamObject = new S3StreamObject();
streamObject.setStartOffset(100L);
streamObject.setEndOffset(200L);
list.add(streamObject);

cache.initStream(1, list);

list = new ArrayList<>();

streamObject = new S3StreamObject();
streamObject.setStartOffset(200L);
streamObject.setEndOffset(300L);
list.add(streamObject);

streamObject = new S3StreamObject();
streamObject.setStartOffset(300L);
streamObject.setEndOffset(400L);
list.add(streamObject);

cache.cache(1, list);

List<S3StreamObject> result = cache.listStreamObjects(1, 0, 400, 100);
assertEquals(4, result.size());

result = cache.listStreamObjects(1, 0, 100, 100);
assertEquals(1, result.size());
assertEquals(0L, result.get(0).getStartOffset());

result = cache.listStreamObjects(1, 200, 400, 1);
assertEquals(1, result.size());
assertEquals(200L, result.get(0).getStartOffset());

result = cache.listStreamObjects(1, 100, -1, 2);
assertEquals(2, result.size());
assertEquals(100L, result.get(0).getStartOffset());
assertEquals(200L, result.get(1).getStartOffset());
}
}

0 comments on commit 8870c10

Please sign in to comment.