Skip to content

Latest commit

 

History

History
78 lines (57 loc) · 2.11 KB

README.md

File metadata and controls

78 lines (57 loc) · 2.11 KB

streaming

Go Report Card LICENSE LICENSE

streaming

Streaming is a client library, where the input and output data are stored in Kafka clusters.

Introduction

Streaming is a library written for kafka streamming processor,.

Basic Usage

Installation

go get -u github.com/flyaways/streaming

Usage

Streaming Processor

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"github.com/flyaways/streaming"
)

func Processor(msg *sarama.ConsumerMessage, outTopic []string) ([]*sarama.ProducerMessage, error) {
	msgs := []*sarama.ProducerMessage{}
	if msg.Topic == "input-topic-2" {
		msgs = append(msgs, &sarama.ProducerMessage{
			Topic: outTopic[0],
			Key:   sarama.ByteEncoder(msg.Key),
			Value: sarama.ByteEncoder(msg.Value),
		})
	}
	return msgs, nil
}

func main() {
	if err := streaming.NewStreaming(
		[]string{"127.0.0.1:9092"},
		[]string{"input-topic1", "input-topic-2"},
		[]string{"output-topic1", "output-topic"},
		"flyaways-streaming-group",
		cluster.NewConfig(),
		Processor); err != nil {
		log.Panic(err)
	}

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals
}

Credits

Licenses

https://www.apache.org/licenses/LICENSE-2.0

FOSSA Status