Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Code for #758 #2297

Open
wants to merge 1 commit into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/core/ByteBufferList.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ public void add(final byte[] buf, final int offset, final int len) {
total_length += len;
}

/**
* Removes the last added segment from the segments array and returns it to the caller
*
* @return byte array representing the most recently added segment or null if no segments exist
*/
public BufferSegment removeLastSegment() {
if (segments.isEmpty()) {
return null;
}
BufferSegment seg = segments.remove(segments.size() - 1);
total_length -= seg.len;
return seg;
}

/**
* Get the most recently added segment.
*
Expand All @@ -73,7 +87,7 @@ public byte[] getLastSegment() {

/**
* Get the number of segments that have added to this buffer list.
*
*
* @return the segment count
*/
public int segmentCount() {
Expand All @@ -82,7 +96,7 @@ public int segmentCount() {

/**
* Get the accumulated bytes as a single byte array (may be a zero-byte array if empty).
*
*
* @param padding the number of additional bytes to include at the end
* @return the accumulated bytes
*/
Expand Down
27 changes: 27 additions & 0 deletions src/core/ColumnDatapointIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ public void writeToBuffers(ByteBufferList compQualifier, ByteBufferList compValu
compValue.add(value, value_offset, current_val_length);
}

/**
* Write a new qualifier and its associated value to the compacted qualifier buffer and compacted values buffer respectively.
*
* @param newQualifier - the new qualifier
* @param newVal - the new value
* @param compactedQual - the qualifiers buffer
* @param compactedVal - the values buffer
*/
public void writeToBuffers(byte[] newQualifier, byte[] newVal, ByteBufferList compactedQual, ByteBufferList compactedVal) {
compactedQual.add(newQualifier, 0, newQualifier.length);
compactedVal.add(newVal, 0, newVal.length);
}

public void writeToBuffersFromOffset(ByteBufferList compQualifier, ByteBufferList compValue, Pair<Integer, Integer> offsets, Pair<Integer, Integer> offsetLengths) {
compQualifier.add(qualifier, offsets.getKey(), offsetLengths.getKey());
compValue.add(value, offsets.getValue(), offsetLengths.getValue());
Expand Down Expand Up @@ -186,6 +199,20 @@ private boolean update() {
return true;
}

/**
* @return the flags mask from the qualifier
*/
public short getFlagsFromQualifier() {
return Internal.getFlagsFromQualifier(qualifier, qualifier_offset);
}

/**
* @return the column timestamp
*/
public long getColumnTimestamp() {
return column_timestamp;
}

// order in ascending order by timestamp, descending order by row timestamp (so we find the
// entry we are going to keep first, and don't have to copy over it)
@Override
Expand Down
49 changes: 48 additions & 1 deletion src/core/CompactionQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ private void defaultMergeDataPoints(ByteBufferList compacted_qual,
final byte[] discardedVal = col.getCopyOfCurrentValue();
if (!Arrays.equals(existingVal, discardedVal)) {
duplicates_different.incrementAndGet();
if (!tsdb.config.fix_duplicates()) {
if (!tsdb.config.fix_duplicates() && !tsdb.config.sum_duplicates()) {
throw new IllegalDataException("Duplicate timestamp for key="
+ Arrays.toString(row.get(0).key()) + ", ms_offset=" + ts + ", older="
+ Arrays.toString(existingVal) + ", newer=" + Arrays.toString(discardedVal)
Expand All @@ -570,6 +570,53 @@ private void defaultMergeDataPoints(ByteBufferList compacted_qual,
} else {
duplicates_same.incrementAndGet();
}

if (tsdb.config.sum_duplicates()) {
short current_flags = Internal.getFlagsFromQualifier(compacted_qual.getLastSegment());
short discarded_flags = col.getFlagsFromQualifier();

boolean is_current_long = ((current_flags & Const.FLAG_FLOAT) == 0x0);
boolean is_discarded_long = ((discarded_flags & Const.FLAG_FLOAT) == 0x0);

/* the current value flags determine the type of the output (long or double) */

double current_val = 0.0;
double discarded_val = 0.0;

if (is_current_long) {
current_val = Internal.extractIntegerValue(existingVal, 0, (byte)current_flags);
} else {
current_val = Internal.extractFloatingPointValue(existingVal, 0, (byte) current_flags);
}

if (is_discarded_long) {
discarded_val = Internal.extractIntegerValue(discardedVal, 0, (byte)discarded_flags);
} else {
discarded_val = Internal.extractFloatingPointValue(discardedVal, 0, (byte) current_flags);
}

current_val += discarded_val;
byte[] new_val = null;
current_flags = 0;

if (is_current_long) {
new_val = Bytes.fromLong((long) current_val);
current_flags = (short) (new_val.length - 1);
} else {
new_val = Bytes.fromLong(Double.doubleToRawLongBits(current_val));
current_flags = Const.FLAG_FLOAT | 0x7;
}

final byte[] new_qualifier = Internal.buildQualifier(col.getColumnTimestamp(), current_flags);

/* now remove the last value & qualifier (existing one) and write the updated value and qualifier */

prevTs = ts;
compacted_val.removeLastSegment();
compacted_qual.removeLastSegment();
col.writeToBuffers(new_qualifier, new_val, compacted_qual, compacted_val);
ms_in_row |= col.isMilliseconds();
s_in_row |= !col.isMilliseconds();
} else {
prevTs = ts;
col.writeToBuffers(compacted_qual, compacted_val);
Expand Down
11 changes: 9 additions & 2 deletions src/core/IncomingDataPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,18 @@ public Deferred<Object> call(final Boolean allowed) throws Exception {
point.setDurable(!batch_import);
return tsdb.client.append(point);/* .addBoth(cb) */
} else {
final PutRequest point = RequestBuilder.buildPutRequest(tsdb.getConfig(), tsdb.table, row, TSDB.FAMILY,
qualifier, value, timestamp);
boolean isLong = ((flags & Const.FLAG_FLOAT) == 0x0);
if (isLong && tsdb.config.use_hbase_counters()) {
AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(tsdb.table, row, TSDB.FAMILY, qualifier, Bytes.getLong(value));
return tsdb.client.atomicIncrement(counterRequest, !batch_import);
} else {
final PutRequest point = new PutRequest(tsdb.table, row, TSDB.FAMILY,
qualifier, value);
point.setDurable(!batch_import);
return tsdb.client.put(point)/* .addBoth(cb) */;
}
}
}
@Override
public String toString() {
return "IncomingDataPoints.addPointInternal Write Callback";
Expand Down Expand Up @@ -348,6 +354,7 @@ private long baseTime() {

public Deferred<Object> addPoint(final long timestamp, final long value) {
final byte[] v;

if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
v = new byte[] { (byte) value };
} else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
Expand Down
88 changes: 87 additions & 1 deletion src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.AppendRequest;
import org.hbase.async.Bytes;
import org.hbase.async.Bytes.ByteMap;
Expand Down Expand Up @@ -1205,6 +1206,7 @@ public Deferred<Object> addPoint(final String metric,
final long value,
final Map<String, String> tags) {
final byte[] v;

if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
v = new byte[] { (byte) value };
} else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
Expand Down Expand Up @@ -1393,8 +1395,14 @@ public Deferred<Object> call(final Boolean allowed) throws Exception {
result = client.append(point);
} else if (!isHistogram(qualifier)) {
scheduleForCompaction(row, (int) base_time);
final PutRequest point = RequestBuilder.buildPutRequest(config, table, row, FAMILY, qualifier, value, timestamp);
boolean isLong = ((flags & Const.FLAG_FLOAT) == 0x0);
if (isLong && config.use_hbase_counters()) {
AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(table, row, FAMILY, qualifier, Bytes.getLong(value));
result = client.atomicIncrement(counterRequest);
} else {
final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value);
result = client.put(point);
}
} else {
scheduleForCompaction(row, (int) base_time);
final PutRequest histo_point = new PutRequest(table, row, FAMILY, qualifier, value);
Expand Down Expand Up @@ -2428,4 +2436,82 @@ public void response(Runnable run) {
rpcResponder.response(run);
}


/**
* store a single long value data point in the TSDB as HBase counter.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the value is NaN or infinite.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
*/
public Deferred<Object> addCounter(String metric, long timestamp, long valueAsLong, HashMap<String, String> tags) {

final byte[] value = Bytes.fromLong(valueAsLong);
final short flags = (short) (value.length - 1);

// we only accept positive unix epoch timestamps in seconds or milliseconds
if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 &&
timestamp > 9999999999999L)) {
throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad")
+ " timestamp=" + timestamp
+ " when trying to add value=" + Arrays.toString(value) + '/' + flags
+ " to metric=" + metric + ", tags=" + tags);
}
IncomingDataPoints.checkMetricAndTags(metric, tags);
final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags);
final long base_time;
final byte[] qualifier = Internal.buildQualifier(timestamp, flags);

if ((timestamp & Const.SECOND_MASK) != 0) {
// drop the ms timestamp to seconds to calculate the base timestamp
base_time = ((timestamp / 1000) -
((timestamp / 1000) % Const.MAX_TIMESPAN));
} else {
base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
}

Bytes.setInt(row, (int) base_time, metrics.width());
scheduleForCompaction(row, (int) base_time);

AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(table, row, FAMILY, qualifier, Bytes.getLong(value));
client.atomicIncrement(counterRequest);

if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() &&
!config.enable_tsuid_tracking() && rt_publisher == null) {
return null;
}

final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH,
Const.TIMESTAMP_BYTES);

// for busy TSDs we may only enable TSUID tracking, storing a 1 in the
// counter field for a TSUID with the proper timestamp. If the user would
// rather have TSUID incrementing enabled, that will trump the PUT
if (config.enable_tsuid_tracking() && !config.enable_tsuid_incrementing()) {
final PutRequest tracking = new PutRequest(meta_table, tsuid,
TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1));
client.put(tracking);
} else if (config.enable_tsuid_incrementing() || config.enable_realtime_ts()) {
TSMeta.incrementAndGetCounter(TSDB.this, tsuid);
}

if (rt_publisher != null) {
rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags);
}
return null;
}
}
Loading