diff --git a/src/core/ByteBufferList.java b/src/core/ByteBufferList.java index 443750e082..9f744bfefc 100644 --- a/src/core/ByteBufferList.java +++ b/src/core/ByteBufferList.java @@ -58,6 +58,20 @@ public void add(final byte[] buf, final int offset, final int len) { total_length += len; } + /** + * Removes the last added segment from the segments array and returns it to the caller + * + * @return byte array representing the most recently added segment or null if no segments exist + */ + public BufferSegment removeLastSegment() { + if (segments.isEmpty()) { + return null; + } + BufferSegment seg = segments.remove(segments.size() - 1); + total_length -= seg.len; + return seg; + } + /** * Get the most recently added segment. * @@ -73,7 +87,7 @@ public byte[] getLastSegment() { /** * Get the number of segments that have added to this buffer list. - * + * * @return the segment count */ public int segmentCount() { @@ -82,7 +96,7 @@ public int segmentCount() { /** * Get the accumulated bytes as a single byte array (may be a zero-byte array if empty). - * + * * @param padding the number of additional bytes to include at the end * @return the accumulated bytes */ diff --git a/src/core/ColumnDatapointIterator.java b/src/core/ColumnDatapointIterator.java index 8df8a20626..0d1858678b 100644 --- a/src/core/ColumnDatapointIterator.java +++ b/src/core/ColumnDatapointIterator.java @@ -120,6 +120,19 @@ public void writeToBuffers(ByteBufferList compQualifier, ByteBufferList compValu compValue.add(value, value_offset, current_val_length); } + /** + * Write a new qualifier and its associated value to the compacted qualifier buffer and compacted values buffer respectively. + * + * @param newQualifier - the new qualifier + * @param newVal - the new value + * @param compactedQual - the qualifiers buffer + * @param compactedVal - the values buffer + */ + public void writeToBuffers(byte[] newQualifier, byte[] newVal, ByteBufferList compactedQual, ByteBufferList compactedVal) { + compactedQual.add(newQualifier, 0, newQualifier.length); + compactedVal.add(newVal, 0, newVal.length); + } + public void writeToBuffersFromOffset(ByteBufferList compQualifier, ByteBufferList compValue, Pair offsets, Pair offsetLengths) { compQualifier.add(qualifier, offsets.getKey(), offsetLengths.getKey()); compValue.add(value, offsets.getValue(), offsetLengths.getValue()); @@ -186,6 +199,20 @@ private boolean update() { return true; } + /** + * @return the flags mask from the qualifier + */ + public short getFlagsFromQualifier() { + return Internal.getFlagsFromQualifier(qualifier, qualifier_offset); + } + + /** + * @return the column timestamp + */ + public long getColumnTimestamp() { + return column_timestamp; + } + // order in ascending order by timestamp, descending order by row timestamp (so we find the // entry we are going to keep first, and don't have to copy over it) @Override diff --git a/src/core/CompactionQueue.java b/src/core/CompactionQueue.java index 857d60fbd2..859197bbdd 100644 --- a/src/core/CompactionQueue.java +++ b/src/core/CompactionQueue.java @@ -558,7 +558,7 @@ private void defaultMergeDataPoints(ByteBufferList compacted_qual, final byte[] discardedVal = col.getCopyOfCurrentValue(); if (!Arrays.equals(existingVal, discardedVal)) { duplicates_different.incrementAndGet(); - if (!tsdb.config.fix_duplicates()) { + if (!tsdb.config.fix_duplicates() && !tsdb.config.sum_duplicates()) { throw new IllegalDataException("Duplicate timestamp for key=" + Arrays.toString(row.get(0).key()) + ", ms_offset=" + ts + ", older=" + Arrays.toString(existingVal) + ", newer=" + Arrays.toString(discardedVal) @@ -570,6 +570,53 @@ private void defaultMergeDataPoints(ByteBufferList compacted_qual, } else { duplicates_same.incrementAndGet(); } + + if (tsdb.config.sum_duplicates()) { + short current_flags = Internal.getFlagsFromQualifier(compacted_qual.getLastSegment()); + short discarded_flags = col.getFlagsFromQualifier(); + + boolean is_current_long = ((current_flags & Const.FLAG_FLOAT) == 0x0); + boolean is_discarded_long = ((discarded_flags & Const.FLAG_FLOAT) == 0x0); + + /* the current value flags determine the type of the output (long or double) */ + + double current_val = 0.0; + double discarded_val = 0.0; + + if (is_current_long) { + current_val = Internal.extractIntegerValue(existingVal, 0, (byte)current_flags); + } else { + current_val = Internal.extractFloatingPointValue(existingVal, 0, (byte) current_flags); + } + + if (is_discarded_long) { + discarded_val = Internal.extractIntegerValue(discardedVal, 0, (byte)discarded_flags); + } else { + discarded_val = Internal.extractFloatingPointValue(discardedVal, 0, (byte) current_flags); + } + + current_val += discarded_val; + byte[] new_val = null; + current_flags = 0; + + if (is_current_long) { + new_val = Bytes.fromLong((long) current_val); + current_flags = (short) (new_val.length - 1); + } else { + new_val = Bytes.fromLong(Double.doubleToRawLongBits(current_val)); + current_flags = Const.FLAG_FLOAT | 0x7; + } + + final byte[] new_qualifier = Internal.buildQualifier(col.getColumnTimestamp(), current_flags); + + /* now remove the last value & qualifier (existing one) and write the updated value and qualifier */ + + prevTs = ts; + compacted_val.removeLastSegment(); + compacted_qual.removeLastSegment(); + col.writeToBuffers(new_qualifier, new_val, compacted_qual, compacted_val); + ms_in_row |= col.isMilliseconds(); + s_in_row |= !col.isMilliseconds(); } else { prevTs = ts; col.writeToBuffers(compacted_qual, compacted_val); diff --git a/src/core/IncomingDataPoints.java b/src/core/IncomingDataPoints.java index 108e641a6c..94795d74de 100644 --- a/src/core/IncomingDataPoints.java +++ b/src/core/IncomingDataPoints.java @@ -312,12 +312,18 @@ public Deferred call(final Boolean allowed) throws Exception { point.setDurable(!batch_import); return tsdb.client.append(point);/* .addBoth(cb) */ } else { - final PutRequest point = RequestBuilder.buildPutRequest(tsdb.getConfig(), tsdb.table, row, TSDB.FAMILY, - qualifier, value, timestamp); + boolean isLong = ((flags & Const.FLAG_FLOAT) == 0x0); + if (isLong && tsdb.config.use_hbase_counters()) { + AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(tsdb.table, row, TSDB.FAMILY, qualifier, Bytes.getLong(value)); + return tsdb.client.atomicIncrement(counterRequest, !batch_import); + } else { + final PutRequest point = new PutRequest(tsdb.table, row, TSDB.FAMILY, + qualifier, value); point.setDurable(!batch_import); return tsdb.client.put(point)/* .addBoth(cb) */; } } + } @Override public String toString() { return "IncomingDataPoints.addPointInternal Write Callback"; @@ -348,6 +354,7 @@ private long baseTime() { public Deferred addPoint(final long timestamp, final long value) { final byte[] v; + if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) { v = new byte[] { (byte) value }; } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) { diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 53e6fb15fc..36853cf1d1 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.AppendRequest; import org.hbase.async.Bytes; import org.hbase.async.Bytes.ByteMap; @@ -1205,6 +1206,7 @@ public Deferred addPoint(final String metric, final long value, final Map tags) { final byte[] v; + if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) { v = new byte[] { (byte) value }; } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) { @@ -1393,8 +1395,14 @@ public Deferred call(final Boolean allowed) throws Exception { result = client.append(point); } else if (!isHistogram(qualifier)) { scheduleForCompaction(row, (int) base_time); - final PutRequest point = RequestBuilder.buildPutRequest(config, table, row, FAMILY, qualifier, value, timestamp); + boolean isLong = ((flags & Const.FLAG_FLOAT) == 0x0); + if (isLong && config.use_hbase_counters()) { + AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(table, row, FAMILY, qualifier, Bytes.getLong(value)); + result = client.atomicIncrement(counterRequest); + } else { + final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value); result = client.put(point); + } } else { scheduleForCompaction(row, (int) base_time); final PutRequest histo_point = new PutRequest(table, row, FAMILY, qualifier, value); @@ -2428,4 +2436,82 @@ public void response(Runnable run) { rpcResponder.response(run); } + + /** + * store a single long value data point in the TSDB as HBase counter. + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the value is NaN or infinite. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + public Deferred addCounter(String metric, long timestamp, long valueAsLong, HashMap tags) { + + final byte[] value = Bytes.fromLong(valueAsLong); + final short flags = (short) (value.length - 1); + + // we only accept positive unix epoch timestamps in seconds or milliseconds + if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 && + timestamp > 9999999999999L)) { + throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") + + " timestamp=" + timestamp + + " when trying to add value=" + Arrays.toString(value) + '/' + flags + + " to metric=" + metric + ", tags=" + tags); + } + IncomingDataPoints.checkMetricAndTags(metric, tags); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + final long base_time; + final byte[] qualifier = Internal.buildQualifier(timestamp, flags); + + if ((timestamp & Const.SECOND_MASK) != 0) { + // drop the ms timestamp to seconds to calculate the base timestamp + base_time = ((timestamp / 1000) - + ((timestamp / 1000) % Const.MAX_TIMESPAN)); + } else { + base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); + } + + Bytes.setInt(row, (int) base_time, metrics.width()); + scheduleForCompaction(row, (int) base_time); + + AtomicIncrementRequest counterRequest = new AtomicIncrementRequest(table, row, FAMILY, qualifier, Bytes.getLong(value)); + client.atomicIncrement(counterRequest); + + if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() && + !config.enable_tsuid_tracking() && rt_publisher == null) { + return null; + } + + final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH, + Const.TIMESTAMP_BYTES); + + // for busy TSDs we may only enable TSUID tracking, storing a 1 in the + // counter field for a TSUID with the proper timestamp. If the user would + // rather have TSUID incrementing enabled, that will trump the PUT + if (config.enable_tsuid_tracking() && !config.enable_tsuid_incrementing()) { + final PutRequest tracking = new PutRequest(meta_table, tsuid, + TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1)); + client.put(tracking); + } else if (config.enable_tsuid_incrementing() || config.enable_realtime_ts()) { + TSMeta.incrementAndGetCounter(TSDB.this, tsuid); + } + + if (rt_publisher != null) { + rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); + } + return null; + } } diff --git a/src/tsd/PutDataPointCounterRpc.java b/src/tsd/PutDataPointCounterRpc.java new file mode 100644 index 0000000000..86096350e6 --- /dev/null +++ b/src/tsd/PutDataPointCounterRpc.java @@ -0,0 +1,282 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2010-2012 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.tsd; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; + +import net.opentsdb.core.IncomingDataPoint; +import net.opentsdb.core.TSDB; +import net.opentsdb.core.Tags; +import net.opentsdb.stats.StatsCollector; +import net.opentsdb.uid.NoSuchUniqueName; + +public class PutDataPointCounterRpc implements TelnetRpc, HttpRpc { + + private static final Logger LOG = LoggerFactory.getLogger(PutDataPointCounterRpc.class); + private static final AtomicLong requests = new AtomicLong(); + private static final AtomicLong hbase_errors = new AtomicLong(); + private static final AtomicLong invalid_values = new AtomicLong(); + private static final AtomicLong illegal_arguments = new AtomicLong(); + private static final AtomicLong unknown_metrics = new AtomicLong(); + + /** + * Collects the stats and metrics tracked by this instance. + * + * @param collector The collector to use. + */ + public static void collectStats(final StatsCollector collector) { + collector.record("rpc.received", requests, "type=put"); + collector.record("rpc.errors", hbase_errors, "type=hbase_errors"); + collector.record("rpc.errors", invalid_values, "type=invalid_values"); + collector.record("rpc.errors", illegal_arguments, "type=illegal_arguments"); + collector.record("rpc.errors", unknown_metrics, "type=unknown_metrics"); + } + + public Deferred execute(final TSDB tsdb, final Channel chan, + final String[] cmd) { + requests.incrementAndGet(); + String errmsg = null; + try { + final class PutErrback implements Callback { + public Exception call(final Exception arg) { + if (chan.isConnected()) { + chan.write("putctr: HBase error: " + arg.getMessage() + '\n'); + } + hbase_errors.incrementAndGet(); + return arg; + } + + public String toString() { + return "report error to channel"; + } + } + return importDataPoint(tsdb, cmd).addErrback(new PutErrback()); + } catch (NumberFormatException x) { + errmsg = "putctr: invalid value: " + x.getMessage() + '\n'; + invalid_values.incrementAndGet(); + } catch (IllegalArgumentException x) { + errmsg = "putctr: illegal argument: " + x.getMessage() + '\n'; + illegal_arguments.incrementAndGet(); + } catch (NoSuchUniqueName x) { + errmsg = "putctr: unknown metric: " + x.getMessage() + '\n'; + unknown_metrics.incrementAndGet(); + } + if (errmsg != null) { + LOG.debug(errmsg); + if (chan.isConnected()) { + chan.write(errmsg); + } + } + return Deferred.fromResult(null); + } + + /** + * Handles HTTP RPC put requests + * + * @param tsdb The TSDB to which we belong + * @param query The HTTP query from the user + * @throws IOException if there is an error parsing the query or formatting + * the output + * @throws BadRequestException if the user supplied bad data + * @since 2.0 + */ + public void execute(final TSDB tsdb, final HttpQuery query) + throws IOException { + requests.incrementAndGet(); + + // only accept POST + if (query.method() != HttpMethod.POST) { + throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, + "Method not allowed", "The HTTP method [" + query.method().getName() + + "] is not permitted for this endpoint"); + } + + final List dps = query.serializer().parsePutV1(); + if (dps.size() < 1) { + throw new BadRequestException("No datapoints found in content"); + } + + final boolean show_details = query.hasQueryStringParam("details"); + final boolean show_summary = query.hasQueryStringParam("summary"); + final ArrayList> details = show_details + ? new ArrayList>() : null; + long success = 0; + long total = 0; + + for (IncomingDataPoint dp : dps) { + total++; + try { + if (dp.getMetric() == null || dp.getMetric().isEmpty()) { + if (show_details) { + details.add(this.getHttpDetails("Metric name was empty", dp)); + } + LOG.warn("Metric name was empty: " + dp); + continue; + } + if (dp.getTimestamp() <= 0) { + if (show_details) { + details.add(this.getHttpDetails("Invalid timestamp", dp)); + } + LOG.warn("Invalid timestamp: " + dp); + continue; + } + if (dp.getValue() == null || dp.getValue().isEmpty()) { + if (show_details) { + details.add(this.getHttpDetails("Empty value", dp)); + } + LOG.warn("Empty value: " + dp); + continue; + } + if (dp.getTags() == null || dp.getTags().size() < 1) { + if (show_details) { + details.add(this.getHttpDetails("Missing tags", dp)); + } + LOG.warn("Missing tags: " + dp); + continue; + } + if (Tags.looksLikeInteger(dp.getValue())) { + tsdb.addCounter(dp.getMetric(), dp.getTimestamp(), + Tags.parseLong(dp.getValue()), dp.getTags()); + } else { + Float valueAsFloat = Float.parseFloat(dp.getValue()); + tsdb.addCounter(dp.getMetric(), dp.getTimestamp(), + valueAsFloat.longValue(), dp.getTags()); + } + success++; + } catch (NumberFormatException x) { + if (show_details) { + details.add(this.getHttpDetails("Unable to parse value to a number", + dp)); + } + LOG.warn("Unable to parse value to a number: " + dp); + invalid_values.incrementAndGet(); + } catch (IllegalArgumentException iae) { + if (show_details) { + details.add(this.getHttpDetails(iae.getMessage(), dp)); + } + LOG.warn(iae.getMessage() + ": " + dp); + illegal_arguments.incrementAndGet(); + } catch (NoSuchUniqueName nsu) { + if (show_details) { + details.add(this.getHttpDetails("Unknown metric", dp)); + } + LOG.warn("Unknown metric: " + dp); + unknown_metrics.incrementAndGet(); + } + } + + final long failures = total - success; + if (!show_summary && !show_details) { + if (failures > 0) { + throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, + "One or more data points had errors", + "Please see the TSD logs or append \"details\" to the putctr request"); + } else { + query.sendReply(HttpResponseStatus.NO_CONTENT, "".getBytes()); + } + } else { + final HashMap summary = new HashMap(); + summary.put("success", success); + summary.put("failed", failures); + if (show_details) { + summary.put("errors", details); + } + + if (failures > 0) { + query.sendReply(HttpResponseStatus.BAD_REQUEST, + query.serializer().formatPutV1(summary)); + } else { + query.sendReply(query.serializer().formatPutV1(summary)); + } + } + } + + /** + * Imports a single data point. + * + * @param tsdb The TSDB to import the data point into. + * @param words The words describing the data point to import, in + * the following format: {@code [metric, timestamp, value, ..tags..]} + * @return A deferred object that indicates the completion of the request. + * @throws NumberFormatException if the timestamp or value is invalid. + * @throws IllegalArgumentException if any other argument is invalid. + * @throws NoSuchUniqueName if the metric isn't registered. + */ + private Deferred importDataPoint(final TSDB tsdb, final String[] words) { + words[0] = null; // Ditch the "put". + if (words.length < 5) { // Need at least: metric timestamp value tag + // ^ 5 and not 4 because words[0] is "put". + throw new IllegalArgumentException("not enough arguments" + + " (need least 4, got " + (words.length - 1) + ')'); + } + final String metric = words[1]; + if (metric.length() <= 0) { + throw new IllegalArgumentException("empty metric name"); + } + final long timestamp; + if (words[2].contains(".")) { + timestamp = Tags.parseLong(words[2].replace(".", "")); + } else { + timestamp = Tags.parseLong(words[2]); + } + if (timestamp <= 0) { + throw new IllegalArgumentException("invalid timestamp: " + timestamp); + } + final String value = words[3]; + if (value.length() <= 0) { + throw new IllegalArgumentException("empty value"); + } + final HashMap tags = new HashMap(); + for (int i = 4; i < words.length; i++) { + if (!words[i].isEmpty()) { + Tags.parse(tags, words[i]); + } + } + if (Tags.looksLikeInteger(value)) { + return tsdb.addCounter(metric, timestamp, Tags.parseLong(value), tags); + } else { // floating point value + Float valueAsFloat = Float.parseFloat(value); + return tsdb.addCounter(metric, timestamp, valueAsFloat.longValue(), tags); + } + } + + /** + * Simple helper to format an error trying to save a data point + * + * @param message The message to return to the user + * @param dp The datapoint that caused the error + * @return A hashmap with information + * @since 2.0 + */ + final private HashMap getHttpDetails(final String message, + final IncomingDataPoint dp) { + final HashMap map = new HashMap(); + map.put("error", message); + map.put("datapoint", dp); + return map; + } +} + diff --git a/src/utils/Config.java b/src/utils/Config.java index fe05ea3a92..be101dbae9 100644 --- a/src/utils/Config.java +++ b/src/utils/Config.java @@ -74,6 +74,9 @@ public class Config { /** tsd.storage.enable_compaction */ private boolean enable_compactions = true; + /** tsd.storage.sum_duplicates */ + private boolean sum_duplicates = false; + /** tsd.storage.enable_appends */ private boolean enable_appends = false; @@ -209,6 +212,10 @@ public boolean auto_tagv() { return auto_tagv; } + public boolean sum_duplicates() { + return sum_duplicates; + } + /** @param auto_metric whether or not to auto create metrics */ public void setAutoMetric(boolean auto_metric) { this.auto_metric = auto_metric; @@ -626,12 +633,14 @@ protected void setDefaults() { default_map.put("tsd.storage.hbase.data_table", "tsdb"); default_map.put("tsd.storage.hbase.uid_table", "tsdb-uid"); default_map.put("tsd.storage.hbase.tree_table", "tsdb-tree"); + default_map.put("tsd.storage.hbase.use_hbase_counters", "false"); default_map.put("tsd.storage.hbase.meta_table", "tsdb-meta"); default_map.put("tsd.storage.hbase.zk_quorum", "localhost"); default_map.put("tsd.storage.hbase.zk_basedir", "/hbase"); default_map.put("tsd.storage.hbase.prefetch_meta", "false"); default_map.put("tsd.storage.enable_appends", "false"); default_map.put("tsd.storage.repair_appends", "false"); + default_map.put("tsd.storage.sum_duplicates", "false"); default_map.put("tsd.storage.enable_compaction", "true"); default_map.put("tsd.storage.compaction.flush_interval", "10"); default_map.put("tsd.storage.compaction.min_flush_threshold", "100"); @@ -755,6 +764,7 @@ public void loadStaticVariables() { auto_tagk = this.getBoolean("tsd.core.auto_create_tagks"); auto_tagv = this.getBoolean("tsd.core.auto_create_tagvs"); enable_compactions = this.getBoolean("tsd.storage.enable_compaction"); + sum_duplicates = this.getBoolean("tsd.storage.sum_duplicates"); enable_appends = this.getBoolean("tsd.storage.enable_appends"); repair_appends = this.getBoolean("tsd.storage.repair_appends"); enable_chunked_requests = this.getBoolean("tsd.http.request.enable_chunked");