Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Union and intersection of TimeSeries #100

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

ahmed-mahran
Copy link
Contributor

Additions to the public API:

  • DateTimeIndex helper methods
    • millisIterator(): Iterator[Long]
    • zonedDateTimeIterator(): Iterator[ZonedDateTime]
    • insertionLoc methods to find the location at which the given date-time could be inserted. It is the location of the first date-time that is greater than the given date-time. If the given date-time is greater than or equal to the last date-time in the index, the index size is returned. Used in transformations on multiple indices.
    • atZone(zone: ZoneId) adjusts the time zone of the index. Used in transformations on multiple indices.
  • Transformations added to TimeSeries
    • union multiple multivariate time series of disjoint keys into one multivariate time series by applying union on all time indices and rebasing all univariate time series using the union index.
    • intersect multiple multivariate time series of disjoint keys into one multivariate time series if possible

Additions to the private API:

  • Generic DateTimeIndex rebaser at TimeSeriesUtils.rebaserGeneric(sourceIndex: DateTimeIndex, targetIndex: DateTimeIndex, defaultValue: Double). Helpful for transformations on multiple indices of different types.
  • DateTimeIndexUtils object holds utilities methods of the DateTimeIndex
    • dateTimeIndexOrdering defines an ordering on DateTimeIndex s.t. for two DateTimeIndex x and y, x < y iff x.first < y.first || (x.first == y.first && x.size < y.size)
    • simplify(indices: Array[DateTimeIndex]): Array[DateTimeIndex] merges contiguous indices as possible
    • union unions a list of indices into one DateTimeIndex
    • intersect intersects a list of indices and returns a new index if possible
  • TimeSeriesUtils
    • rebaseAndMerge(tss: Array[TimeSeries[K]], newIndex: DateTimeIndex, defaultValue: Double): TimeSeries[K] a utility for rebasing a collection of multivariate time series of disjoint keys and merging them into one multivariate time series

* b: is the array index of the date-time index where the queried date-time dt could
* be inserted. This value is used by insertionLoc method.
*/
private def binarySearch(low: Int, high: Int, dt: ZonedDateTime): (Int, Int) = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include a header comment here to indicate what the two tuple values mean? ~ @sryza

Added

@sryza
Copy link
Owner

sryza commented Dec 27, 2015

Do you think it will be common for users to want to union / intersect both vertically and horizontally at the same time? I'm wondering if it makes sense to have different methods for unioning time-wise vs. column-wise.

@ahmed-mahran
Copy link
Contributor Author

I'm sorry I didn't get what you mean.

I think union/intersect time-wise implies combining columns; it is well defined how to handle time overlaps however it is not defined how to handle overlaps of columns of equal keys. Hence, keys of unioned/intersected time series should be disjoint. The predefined method def union(vec: Vector, key: K): TimeSeries[K] combines a column to a set of columns assuming a complete overlap in time of unioned time series. The new union methods are just more generic definitions. What do you think?

* or a is of size 1 and b is irregular -> d is irregular
*/
def simplify(indices: Array[DateTimeIndex]): Array[DateTimeIndex] = {
val simplified = new ListBuffer[DateTimeIndex]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayBuffer is preferable to ListBuffer for performance ~ @sryza

Only append operations are performed and ListBuffer is better than ArrayBuffer in append.
http://docs.scala-lang.org/overviews/collections/performance-characteristics.html

Am I missing something?

@sryza
Copy link
Owner

sryza commented Dec 29, 2015

Thinking about this a little more, my concern is that, as an RDD, TimeSeriesRDD already has a union method, and RDD union is defined to operate row-wise. Thus, users wouldn't expect it to operate column-wise (i.e. to make DateTimeIndex transformations) as well. The union operation defined here operation acts more like an outer join - it both widens and lengthens the dataset. Users might also be interested in intersecting across rows, but unioning across columns (time). This acts more like an inner join.

Then there's also the question of "left joins", both across rows and columns. E.g., I imagine that the most common use case for this type of functionality would be someone who wants to update a dataset from observations from a wider time range. They have a data set, and they have another dataset, which possibly has more keys than the first dataset, and covers a different time range. They want their resulting dataset to include only the keys from the first dataset, but to cover the time ranges from both the first and second data set.

So ultimately I think there are two ternary parameters that come up when someone wants to join two TimeSeriesRDDs:

  • Inner vs. Outer vs. Left on the row axis - Inner means that the resulting dataset only contains keys that were present in both of the input datasets. Outer means that the resulting dataset contains keys that were present in either of the input datasets. Left means that the resulting dataset only contains keys that were present in the first input dataset.
  • Inner vs. Outer vs. Left on the column axis - Inner means that the resulting dataset only contains times that were present in both of the input datasets. Outer means that the resulting dataset contains times that were present in either of the input datasets. Left means that the resulting dataset only contains times that were present in the first input dataset.

I still need to think a little bit about the best way to expose an API for this, but I am of course open to suggestions.

@ahmed-mahran
Copy link
Contributor Author

I can see your thoughts

It is more about joins than just horizontal gluing of datasets. In fact, it's about set operations on time index. The sequel commits of the original PR #88 have left (right) join and except (or difference) set operations. Those kind of transformations are basically transformations of the underlying date time index such that:

new_ts = transform(ts) is equivalent to

new_index = transform(ts.index)
new_ts = ts.rebase(new_index)

new_ts = transform(ts1, ts2) is equivalent to

new_index = transform(ts1.index, ts2.index)
new_ts = ts1.rebase(new_index) + ts2.rebase(new_index) // '+' is horizontal concatenation

We have two concerns here:

  • Naming convention (union VS outer join): Let's defer this to a further point in time when we reach a concrete understanding of the second concern. I think we are going to reach something different, in concept, than what RDD currently have. In general, method names should not induce conflicts and misunderstandings.
  • Transformation domain (i.e. acting on time, keys or both)

Let's tackle the second concern by example:

Consider the following two multivariate time series:

ts1 t1 t2
k1 v1 v2
k2 v3 v4
ts2 t2 t3 t4
k2 v4 v5 v6
k3 v7 v8 v9

Let's consider an outer join but vary the domain between key, time and key-time:

Outer join on key

t1 t2 t2 t3 t4
k1 v1 v2
k2 v3 v4 v4 v5 v6
k3 v7 v8 v9

Distinct keys and concatenated time indices ... This might result in improper time index. It could be okay if time indices are disjoint / non-overlapping.

Outer join on time

t1 t2 t3 t4
k1 v1 v2
k2 v3 v4
k2 v4 v5 v6
k3 v7 v8 v9

Distinct time instants and concatenated keys ... This might result in duplicate keys. It could be okay if keys are disjoint.

Outer join on key-time

t1 t2 t3 t4
k1 v1 v2
k2 v3 (v4, v4) v5 v6
k3 v7 v8 v9

Distinct time instants and distinct keys ... This might result in multiple values (collisions) for the same key and instant. It could be okay if keys and time indices are disjoint.

Outer join on key VS Outer join on time
It is not much frequently affordable to have disjoint time indices. Moreover, it is not acceptable to have improper time indices as a result of any operation. Not to mention the complexity of reconciling an improper index. On the other hand, it is acceptable to have duplicate keys as a result of the operation. Moreover, the reconciliation would be a matter of reducing by key. Hence, Outer join on time is more valid than Outer join on key

Outer join on key-time VS Outer join on time
Collision resolution might be hard to generalize and would add complexities to the API. Thinking of the structure of the outcome, a univariate time series where an instant has multiple values is not into the current framework. If a collision resolution function is to be provided, it might be required to work on a window in time around the collision instant. The question here is that wouldn't a join on time optionally followed by whatever transformations the user applies to resolve duplicate keys, at least a reduction by key, be more simpler than join on key-time?

@sryza
Copy link
Owner

sryza commented Jan 3, 2016

Thanks for all the detail above. I agree with most of your points. If I understand correctly, your ultimate assertion is that we should start by exposing binary operators that return a TimeSeriesRDD with a unified time index, but that don't merge the data in time series with the same key?

I think that sounds mostly reasonable, but there's a performance concern. In the most common situations, I think users ultimately will want to merge series with the same keys. If that is implemented with tsRdd1.joinOnTime(tsRdd2).reduceByKey(mergeFunc), the joinOnTime operator will need to copy all of the series in the input datasets into larger vectors, and then when the shuffle occurs, these larger vectors will be the ones passed around. So, by directly shuffling the input vectors, a combined tsRdd.joinAndTimeAndMergeKeys(tsRdd2) could avoid a memory copy and shuffle only half the amount of data. The only way I can think of around this would be to provide a new Vector implementation that wraps an existing vector but supports leading and trailing NaNs. Thoughts?

If you want, while we figure out the right approach here, you could post a PR that just includes DateTimeIndex.union (not TimeSeries) and I could review and merge that.

@ahmed-mahran
Copy link
Contributor Author

... start by exposing binary operators that return a TimeSeriesRDD with a unified time index, but that don't merge the data in time series with the same key?

I would say yes. I'd rather say that the API assumes disjoint keys.

... but there's a performance concern ...

👍 good catch

The only way I can think of around this would be to provide a new Vector implementation that wraps an existing vector but supports leading and trailing NaNs. Thoughts?

Sounds like a sparse vector. Generalizing on that, how about a vector that holds a special value, may be call it a span, that holds a value (say NaN) for a contiguous range of indices)?

Conceptually, I can see a vector here as a multipart function such that each part is defined within a separate range of indices, one part could be a direct mapping like a vector in the same sense as we know, another part could be a constant value, another part could be a spline ... I think this might help a lot in lengthy synthesized time series (upsampling for example) ... You don't store the data, instead you store a generating model of the data

@ahmed-mahran
Copy link
Contributor Author

Issued a PR #101 for the first three commits; about a generic rebaser

@sryza
Copy link
Owner

sryza commented Jan 4, 2016

I'd rather say that the API assumes disjoint keys.

I think in the most common cases keys will not be disjoint. For example, I imagine that a common case is that one has a bunch of tick data from one source covering a certain time range, as well as a bunch of tick data for the same ticker symbols from another source, covering a different time range.

Sounds like a sparse vector. Generalizing on that, how about a vector that holds a special value, may be call it a span, that holds a value (say NaN) for a contiguous range of indices)?

Yeah, somewhere between a sparse vector and a dense vector. I.e. a vector with a dense range, and otherwise empty. Using the existing Breeze sparse vector would be inefficient because it would require storing an index alongside every value in the dense range.

Conceptually, I can see a vector here as a multipart function such that each part is defined within a separate range of indices

Yeah, exactly. That said, I don't think it makes sense to add this in full generality right now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants