Sif
is a framework for fast, predictable, general-purpose distributed computing in the map/reduce paradigm.
Sif
is new, and currently under heavy development. It should be considered alpha software prior to a 1.0.0 release, with the API and behaviours subject to change.
Sif
is offered primarily as a simpler alternative to Apache Spark, with the following goals in mind:
Predictability: An emphasis on fixed-width data and in-place manipulation makes it easier to reason over the compute and memory requirements of a particular job. Sif
's API is designed around a "no magic" philosophy, and attempts to make obvious the runtime consequences of any particular line of code.
Scalability: While scaling up to massive datasets is the primary focus of any distributed computing framework, Sif
is also designed to scale down to small datasets as well with minimal startup time and other cluster-oriented overhead.
Ease of Integration: Rather than deploying Sif
as a complex, persistent cluster, Sif
is a library designed to ease the integration of cluster computing functionality within a larger application or architectural context. Write your own REST API which manages the initiation of a Sif
pipeline!
Ease of Development: Sif
applications are traceable and debuggable, and the core Sif
codebase is designed to be as small as possible.
API Minimalism: A single representation of distributed data, with a single set of tools for manipulating it.
Architectural Minimalism: Throw away YARN and Mesos. Compile your Sif
pipeline, package it as a Docker image, then deploy it to distributed container infrastructure such as Docker Swarm or Kubernetes.
Note: Sif
is developed and tested against Go 1.18, leveraging generics support:
$ go get github.com/go-sif/sif
Sif
facilitates the definition and execution of a distributed compute pipeline through the use of a few basic components. For the sake of this example, we will assume that we have JSON Lines-type data with the following format:
{"id": 1234, "meta": {"uuid": "27366d2d-502c-4c03-84c3-55dc5ecedd3f", "name": "John Smith"}}
{"id": 5678, "meta": {"uuid": "f21dec0a-37f3-4b0d-9d92-d26b11c62ed8", "name": "Jane Doe"}}
...
A Schema
outlines the structure of the data which will be manipulated. In Sif
, data is represented as a sequence of Row
s, each of which consist of Column
s with particular ColumnType
s.
package main
import (
"log"
"github.com/go-sif/sif"
"github.com/go-sif/sif/coltype"
"github.com/go-sif/sif/schema"
)
func main() {
id := coltype.Int32("id")
// Schemas should employ fixed-width ColumnTypes whenever possible
metaUUID := coltype.String("meta.uuid", 36)
// or variable-width ColumnTypes, if the size of a field is not known
metaName := coltype.VarString("meta.name")
schema, err := schema.CreateSchema(id, metaUUID, metaName)
if err != nil {
log.Fatal(err)
}
}
A DataSource
represents a source of raw data, which will be partitioned and parsed into Row
s via a Parser
in parallel across workers in the cluster. Sif
contains several example DataSource
s, primarily for testing purposes, but it is intended that DataSource
s for common sources such as databases, Kafka, etc. will be provided as separate packages.
DataSource
s typically use supplied column names to fetch specific data from the underlying source (as shown in the below jsonl
example).
Ultimately, a DataSource
provides a DataFrame
which can be manipulated by Sif
operations.
package main
import (
"log"
"path"
"github.com/go-sif/sif"
"github.com/go-sif/sif/coltype"
"github.com/go-sif/sif/schema"
"github.com/go-sif/sif/datasource/file"
"github.com/go-sif/sif/datasource/parser/jsonl"
)
func main() {
id := coltype.Int32("id")
// In this case, since our Schema featured column names with dots,
// the jsonl parser is smart enough to use these column names to
// search within each JSON object for a nested field matching
// these paths.
metaUUID := coltype.String("meta.uuid", 36)
metaName := coltype.VarString("meta.name")
schema, err := schema.CreateSchema(id, metaUUID, metaName)
if err != nil {
log.Fatal(err)
}
parser := jsonl.CreateParser(&jsonl.ParserConf{
PartitionSize: 128,
})
frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)
}
A DataFrame
facilitates the definition of an execution plan. Multiple Operation
s are chained together, and then passed to Sif
for evaluation:
package main
import (
"path"
"log"
"strings"
"github.com/go-sif/sif"
"github.com/go-sif/sif/schema"
"github.com/go-sif/sif/datasource/file"
"github.com/go-sif/sif/datasource/parser/jsonl"
ops "github.com/go-sif/sif/operations/transform"
)
func main() {
id := coltype.Int32("id")
metaUUID := coltype.String("meta.uuid", 36)
metaName := coltype.VarString("meta.name")
schema, err := schema.CreateSchema(id, metaUUID, metaName)
if err != nil {
log.Fatal(err)
}
parser := jsonl.CreateParser(&jsonl.ParserConf{
PartitionSize: 128,
})
frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)
lowercaseName := coltype.VarString("lowercase_name") // a column we will add
frame, err := frame.To(
ops.AddColumn(lowercaseName),
ops.Map(func(row sif.Row) error {
if metaName.IsNil(row) {
return nil
}
name, err := metaName.From(row)
if err != nil {
return err
}
return lowercaseName.To(row, strings.ToLower(name))
}),
)
if err != nil {
log.Fatal(err)
}
}
Execution of a DataFrame
involves starting and passing it to a Sif
cluster. Sif
clusters, at the moment, consist of a single Coordinator
and multiple Worker
s. Each is an identical binary, with the difference in role determined by the SIF_NODE_TYPE
environment variable (set to "coordinator"
or "worker"
). This makes it easy to compile a single executable which can then be deployed and scaled up or down as one sees fit.
package main
import (
"path"
"log"
"strings"
"context"
"github.com/go-sif/sif"
"github.com/go-sif/sif/schema"
"github.com/go-sif/sif/datasource/file"
"github.com/go-sif/sif/datasource/parser/jsonl"
ops "github.com/go-sif/sif/operations/transform"
"github.com/go-sif/sif/cluster"
)
func main() {
id := coltype.Int32("id")
metaUUID := coltype.String("meta.uuid", 36)
metaName := coltype.VarString("meta.name")
schema, err := schema.CreateSchema(id, metaUUID, metaName)
if err != nil {
log.Fatal(err)
}
parser := jsonl.CreateParser(&jsonl.ParserConf{
PartitionSize: 128,
})
frame := file.CreateDataFrame("path/to/*.jsonl", parser, schema)
lowercaseName := coltype.VarString("lowercase_name")
frame, err := frame.To(
ops.AddColumn(lowercaseName),
ops.Map(func(row sif.Row) error {
if metaName.IsNil(row) {
return nil
}
name, err := metaName.From(row)
if err != nil {
return err
}
return lowercaseName.To(row, strings.ToLower(name))
}),
)
if err != nil {
log.Fatal(err)
}
// Define a node
// Sif will read the SIF_NODE_TYPE environment variable to
// determine whether this copy of the binary
// is a "coordinator" or "worker".
opts := &cluster.NodeOptions{
NumWorkers: 2,
CoordinatorHost: "insert.coordinator.hostname",
}
node, err := cluster.CreateNode(opts)
if err != nil {
log.Fatal(err)
}
// start this node in the background and run the DataFrame
defer node.GracefulStop()
go func() {
err := node.Start(frame)
if err != nil {
log.Fatal(err)
}
}()
// result will be nil in this case, as only certain
// operations produce a result.
result, err := node.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
Sif
includes multiple operations suitable for manipulating DataFrame
s, which can be found under the github.com/go-sif/sif/operations/transform
package.
Additional utility operations are included in the github.com/go-sif/sif/operations/util
package, which at this time only includes Collect()
, which allows for the collection of results to the Coordinator
for further, local processing.
A couple of complex Operation
s are covered in additional detail here:
Reduction in Sif
is a two step process:
- A
KeyingOperation
labelsRow
s, with the intention that twoRow
s with the same key are reduced together - A
ReductionOperation
defines the mechanism by which twoRow
s are combined (the "right"Row
into the "left"Row
)
For example, if we wanted to bucket the JSON Lines data from Getting Started by name, and then produce counts for names beginning with the same letter:
// ...
totalCol := coltype.Uint32("total")
frame, err := frame.To(
// Add a column to store the total for each first-letter bucket
ops.AddColumn(totalCol),
ops.Reduce(func(row sif.Row) ([]byte, error) {
// Our KeyingOperation comes first, using the first letter as the key
name, err := metaName.From(row)
if err != nil {
return nil, err
}
if len(name) == 0 {
return []byte{0}, nil
}
return []byte{name[0]}, nil
}, func(lrow sif.Row, rrow sif.Row) error {
// Our ReductionOperation comes second
// Since our keys ensure two Rows are only reduced together if they
// have a matching key, we can just add the totals together.
lval, err := totalCol.From(lrow)
if err != nil {
return err
}
rval, err := totalCol.From(rrow)
if err != nil {
return err
}
return totalCol.To(lrow, lval+rval)
}),
)
// ...
Tip: ops.KeyColumns(colNames ...string)
can be used with ops.Reduce
to quickly produce a key (or compound key) from a set of column values.
Sif Accumulator
s are an alternative mechanism for reduction, which offers full customization of reduciton technique, in exchange for accumulation ending a sif
pipeline. In exchange for losing the ability to further transform and reduce data, Accumulator
s offer the potential for significant performance benefits.
Sif offers built-in Accumulators
in the accumulators
package.
For example, we can use accumulators.Counter()
to efficiently count records:
// ...
counter := accumulators.Counter()
frame, err := frame.To(
util.Accumulate(counter)
)
// ...
// In this case, node.Run returns an Accumulator, which can be
// manipulated on the Coordinator node.
result, err := node.Run(context.Background())
if node.IsCoordinator() {
totalRows, _ = counter.Value(result.Accumulator)
}
Collection is the process of pulling results from distributed data back to the Coordinator
for local processing. This is not generally encouraged - rather, it is best if Worker
s write their results directly to an output destination. But, it is occasionally useful, such as in the writing of tests:
// ...
lowercaseName := coltype.VarString("lowercase_name")
frame, err := frame.To(
ops.AddColumn(lowercaseName),
ops.Map(func(row sif.Row) error {
if metaName.IsNil(row) {
return nil
}
name, err := metaName.Get(row)
if err != nil {
return err
}
return lowercaseName.Set(row, strings.ToLower(name))
}),
// To discourage use and unpredictability, you must specify exactly
// how many Partitions of data you wish to Collect:
util.Collect(1)
)
// ...
// In this case, node.Run returns a CollectedPartition, which can be
// manipulated on the Coordinator node.
result, err := node.Run(context.Background())
if node.IsCoordinator() {
err = result.Collected.ForEachRow(func(row sif.Row) error {
// Do something with results
})
}
// ...
See Implementing Custom ColumnTypes for details.
See Implementing Custom Accumulators for details.
See Implementing Custom DataSources for details.
See Implementing Custom Parsers for details.
Sif
is licensed under the Apache 2.0 License, found in the LICENSE file.