Skip to content

Commit

Permalink
从flink复制部分代码
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Jan 7, 2025
1 parent bba930a commit e51d9aa
Show file tree
Hide file tree
Showing 19 changed files with 3,446 additions and 0 deletions.
77 changes: 77 additions & 0 deletions docs/dev-guide/stream/windowing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# 窗口函数

## WindowOperator

1. 先为元素分配一组窗口
2. 针对每个窗口,检查trigger是否触发,并修改timer状态
3. timer触发时,同样是检查trigger是否触发

```javascript
void processElement(StreamRecord<IN> element){
const elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
for (const window of elementWindows) {
TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(actualWindow, contents);
}
}

if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
}

void onEventTime(InternalTimer<K, W> timer) {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

MergingWindowSet<W> mergingWindows;

if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}

TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}

if (triggerResult.isPurge()) {
windowState.clear();
}

if (windowAssigner.isEventTime()
&& isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}

if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
```

EvictorWindowOperator和WindowOperator的区别仅在于emit的时候是否调用evictor来删除窗口。
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.cluster.resources;

import java.math.BigDecimal;

import static java.math.BigDecimal.ROUND_HALF_UP;

/**
* Represents CPU resource.
*/
public class CPUResource extends Resource<CPUResource> {

private static final long serialVersionUID = 7228645888210984393L;

public static final String NAME = "CPU";

public CPUResource(double value) {
super(NAME, value);
}

private CPUResource(BigDecimal value) {
super(NAME, value);
}

@Override
public CPUResource create(BigDecimal value) {
return new CPUResource(value);
}

public String toHumanReadableString() {
return String.format(
"%.2f cores", getValue().setScale(2, ROUND_HALF_UP).stripTrailingZeros());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.cluster.resources;

import java.math.BigDecimal;

/**
* An external resource.
*/
public class ExternalResource extends Resource<ExternalResource> {
private static final long serialVersionUID = 1L;

public ExternalResource(String name, double value) {
super(name, value);
}

private ExternalResource(String name, BigDecimal value) {
super(name, value);
}

@Override
protected ExternalResource create(BigDecimal value) {
return new ExternalResource(getName(), value);
}
}
Loading

0 comments on commit e51d9aa

Please sign in to comment.