Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#47]Optimize ConnectRecord api #50

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c15757b
SinkTask and SourceTask implement the validate method https://github.…
sunxiaojian Apr 12, 2022
dcc706d
Adjust the init and start methods of the component interface
sunxiaojian Apr 15, 2022
fa134ee
Set pause and resume to deprecated methods. It feels like they can be…
sunxiaojian Apr 15, 2022
55b9ac0
Add struct object and optimize schema and schema builder API #41
sunxiaojian May 24, 2022
c72659e
add offset storage writer #41
sunxiaojian May 24, 2022
e6deeb8
add getter and setter method #41
sunxiaojian Jun 4, 2022
aa9f315
add SchemaAndValue #41
sunxiaojian Jun 4, 2022
23ed62a
add logical type #41
sunxiaojian Jun 4, 2022
26fe155
Schemabuilder add required method
sunxiaojian Jun 6, 2022
3e6f1e6
schema add hashCode and equals method
sunxiaojian Jun 6, 2022
14765cb
fixed doc method
sunxiaojian Jun 6, 2022
a6b3c79
Field add equals and hashcode method
sunxiaojian Jun 7, 2022
427e009
optimize api #85
sunxiaojian Jun 8, 2022
1dc98a3
Merge branch 'dev'
sunxiaojian Jun 8, 2022
a38fa65
Merge branch 'optimize-api'
sunxiaojian Jun 8, 2022
c2314fb
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 9, 2022
c4b1836
Optimize transform api #45
sunxiaojian Jun 9, 2022
c76d2fb
Optimize transform api and add RecordConverter
sunxiaojian Jun 18, 2022
ed258b0
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 21, 2022
5de17b2
Optimize ConnectRecord api #47
sunxiaojian Jun 22, 2022
898863c
upgrade api to 0.1.4-SNAPSHOT;
sunxiaojian Jun 22, 2022
6c08958
optimize connector
sunxiaojian Jun 22, 2022
db8f31f
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 28, 2022
1b698b7
Merge branch 'openmessaging:master' into dev
sunxiaojian Jun 28, 2022
ea7e3c0
[ISSUE#47]Optimize ConnectRecord api #50
sunxiaojian Jul 13, 2022
fabb29e
Optimize ConnectRecord remove queueId field #50
sunxiaojian Jul 13, 2022
e94593e
Optimize ConnectRecord remove queueId field #47
sunxiaojian Jul 13, 2022
80fb698
Merge branch 'dev' of https://github.com/sunxiaojian/openconnect into…
sunxiaojian Jul 13, 2022
2fa5bb1
Merge branch 'master' into dev
sunxiaojian Jul 14, 2022
d539510
Merge branch 'openmessaging:master' into master
sunxiaojian Jul 15, 2022
c254a7a
Connectrecord add a key field to identify the unique data #53
sunxiaojian Jul 15, 2022
58a9143
Merge branch 'openmessaging:master' into master
sunxiaojian Jul 20, 2022
7036d48
merge new features #47
sunxiaojian Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void init(ConnectorContext connectorContext) {
this.connectorContext = connectorContext;
}

protected ConnectorContext getConnectorContext() {
protected ConnectorContext context() {
return connectorContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
*/
public abstract class SinkConnector extends Connector {

public SinkConnector() {
super();

@Override
protected SinkConnectorContext context() {
return (SinkConnectorContext) connectorContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.connector.api.component.task.sink;


import io.openmessaging.connector.api.component.connector.ConnectorContext;

/**
* source connector context
*/
public interface SinkConnectorContext extends ConnectorContext {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.connector.api.component.task.sink;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Schema;

import java.util.Objects;

/**
* sink connect record
*/
public class SinkRecord extends ConnectRecord<SinkRecord> {

private final Integer queueId;
private final String brokerName;
private final long queueOffset;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenMessaging Connect是一个通用的框架,不适合将RMQ个性化的元素(queueId,brokerName,queueOffset)放到框架里。

public SinkRecord(String brokerName, long queueOffset,String topic, Integer queueId, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, null, null, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data) {
this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,null );
}

public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
this(brokerName, queueOffset, topic, queueId, null, keySchema, key, schema, data ,extensions );
}

public SinkRecord(String brokerName, long queueOffset, String topic, Integer queueId, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, keySchema, key, schema, data, extensions);
this.brokerName = brokerName;
this.queueOffset = queueOffset;
this.queueId = queueId;
}


public Integer queueId(){
return queueId;
}

public String brokerName(){
return brokerName;
}

public long queueOffset(){
return queueOffset;
}

/**
* new record
*
* @param topic
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) {
return newRecord(topic, timestamp, keySchema, key, schema, data, null);
}

/**
* new record
*
* @param topic
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SinkRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
return new SinkRecord(brokerName(), queueOffset(), topic, queueId(), timestamp, keySchema, key, schema, data, extensions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SinkRecord)) return false;
if (!super.equals(o)) return false;
SinkRecord that = (SinkRecord) o;
return queueOffset == that.queueOffset && Objects.equals(queueId, that.queueId) && Objects.equals(brokerName, that.brokerName);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), queueId, brokerName, queueOffset);
}

@Override
public String toString() {
return "SinkRecord{" +
"queueId=" + queueId +
", brokerName='" + brokerName + '\'' +
", queueOffset=" + queueOffset +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public void init(SinkTaskContext sinkTaskContext) {
/**
* Put the records to the sink
*
* @param sinkRecords sink records
* @param records sink records
*/
public abstract void put(List<ConnectRecord> sinkRecords) throws ConnectException;
public abstract void put(List<SinkRecord> records) throws ConnectException;

/**
* Flush the records to the sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@
*/
public abstract class SourceConnector extends Connector {

@Override
protected SourceConnectorContext context() {
return (SourceConnectorContext) connectorContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.connector.api.component.task.source;


import io.openmessaging.connector.api.component.connector.ConnectorContext;
import io.openmessaging.connector.api.storage.OffsetStorageReader;

/**
* source connector context
*/
public interface SourceConnectorContext extends ConnectorContext {

/**
* @return the OffsetStorageReader for this connector.
*/
OffsetStorageReader offsetStorageReader();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.connector.api.component.task.source;

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.RecordPosition;
import io.openmessaging.connector.api.data.Schema;

import java.util.Objects;

/**
* source connect record
*/
public class SourceRecord extends ConnectRecord<SourceRecord> {

private final RecordPosition position;

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, null, null, null, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset,String topic, Schema keySchema, Object key, Schema schema, Object data) {
this(recordPartition, recordOffset, topic, keySchema, key, schema , data, null);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Schema keySchema, Object key, Schema valueSchema, Object value, KeyValue extensions) {
this(recordPartition, recordOffset, topic, null, keySchema, key, valueSchema, value, extensions);
}

public SourceRecord(RecordPartition recordPartition, RecordOffset recordOffset, String topic, Long timestamp, Schema keySchema, Object key, Schema schema, Object data, KeyValue extensions) {
super(topic, timestamp, keySchema, key, schema, data, extensions);
this.position = new RecordPosition(recordPartition, recordOffset);
}

public RecordPosition position() {
return position;
}

/**
* new record
*
* @param topic
* @param schema
* @param data
* @param timestamp
* @return
*/
@Override
public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data) {
return newRecord(topic, timestamp, keySchema, key, schema , data, null );
}

/**
* new record
*
* @param topic
* @param schema
* @param data
* @param timestamp
* @param extensions
* @return
*/
@Override
public SourceRecord newRecord(String topic, Long timestamp, Schema keySchema, Object key,Schema schema, Object data, KeyValue extensions) {
return new SourceRecord(position().getPartition(), position().getOffset(), topic, timestamp, keySchema, key, schema, data, extensions);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SourceRecord)) return false;
if (!super.equals(o)) return false;
SourceRecord that = (SourceRecord) o;
return Objects.equals(position, that.position);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), position);
}

@Override
public String toString() {
return "SourceRecord{" +
"position=" + position +
"} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,41 +35,25 @@ public void init(SourceTaskContext sourceTaskContext) {
/**
* Poll this source task for new records.
*
* @return connectRecord list
* @return SourceRecord list
* @throws InterruptedException task thread interupt exception
*/
public abstract List<ConnectRecord> poll() throws InterruptedException;

public abstract List<SourceRecord> poll() throws InterruptedException;

/**
* batch commit
* @param records
* @param metadata
*/
public void commit(final List<ConnectRecord> records, Map<String,String> metadata) {
public void commit(final List<SourceRecord> records, Map<String,String> metadata) throws InterruptedException {
}
/**
* commit record
* @param record
* @param metadata
*/
public void commit(final ConnectRecord record, Map<String,String> metadata) {
commit(record);
}
public void commit(final SourceRecord record, Map<String,String> metadata) throws InterruptedException {

/**
* <p>
* Commit an individual {@link ConnectRecord} when the callback from the producer client is received.
* </p>
* <p>
* SourceTasks are not required to implement this functionality;Connect System will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*
* @param record connect record
*/
public void commit(final ConnectRecord record) {
}

/**
Expand Down
Loading