diff --git a/Makefile b/Makefile index db78e09f..28d57df2 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ help: # # Usage: # make template : create a mapper based on a template. + # make dmitemplate : create a mapper-dmi based on a template. # make mapper {mapper-name} : execute mapper building process. # make device {device-name} : execute device simulator build process. (only used by modbus) # make e2e {mapper-name} : execute mapper e2e building process. (only support modbus and opcua) @@ -36,4 +37,4 @@ $(make_rules): @$(curr_dir)/hack/make-rules/$@.sh $(rest_args) .DEFAULT_GOAL := all -.PHONY: $(make_rules) build \ No newline at end of file +.PHONY: $(make_rules) build diff --git a/_template/mapper-dmi/Dockerfile b/_template/mapper-dmi/Dockerfile new file mode 100644 index 00000000..3d461360 --- /dev/null +++ b/_template/mapper-dmi/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:16.04 + +RUN mkdir -p kubeedge + +COPY ./bin/Template kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge + +CMD ./Template diff --git a/_template/mapper-dmi/Makefile b/_template/mapper-dmi/Makefile new file mode 100644 index 00000000..a266fedd --- /dev/null +++ b/_template/mapper-dmi/Makefile @@ -0,0 +1,36 @@ +SHELL := /bin/bash + +curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST))))) +rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS)) +$(eval $(rest_args):;@:) + +help: + # + # Usage: + # make template : create a mapper based on a template. + # make mapper {mapper-name} : execute mapper building process. + # make all : execute building process to all mappers. + # + # Actions: + # - mod, m : download code dependencies. + # - lint, l : verify code via go fmt and `golangci-lint`. + # - build, b : compile code. + # - package, p : package docker image. + # - test, t : run unit tests. + # - clean, c : clean output binary. + # + # Parameters: + # ARM : true or undefined + # ARM64 : true or undefined + # + # Example: + # - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64. + # - make mapper modbus test : execute `test` "modbus" mapper. + @echo + +make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g') +$(make_rules): + @$(curr_dir)/hack/make-rules/$@.sh $(rest_args) + +.DEFAULT_GOAL := help +.PHONY: $(make_rules) build test package diff --git a/_template/mapper-dmi/cmd/main.go b/_template/mapper-dmi/cmd/main.go new file mode 100644 index 00000000..5932715b --- /dev/null +++ b/_template/mapper-dmi/cmd/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "errors" + "os" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/config" + "github.com/kubeedge/mappers-go/mappers/Template/device" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/grpcserver" + "github.com/kubeedge/mappers-go/pkg/util/grpcclient" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +func main() { + var err error + var c config.Config + + klog.InitFlags(nil) + defer klog.Flush() + + if err = c.Parse(); err != nil { + klog.Fatal(err) + os.Exit(1) + } + klog.Infof("config: %+v", c) + + grpcclient.Init(&c) + + // start grpc server + grpcServer := grpcserver.NewServer( + grpcserver.Config{ + SockPath: c.GrpcServer.SocketPath, + Protocol: common.ProtocolCustomized, + }, + device.NewDevPanel(), + ) + + panel := device.NewDevPanel() + err = panel.DevInit(&c) + if err != nil && !errors.Is(err, parse.ErrEmptyData) { + klog.Fatal(err) + } + klog.Infoln("devInit finished") + + // register to edgecore + // if dev init mode is register, mapper's dev will init when registry to edgecore + if c.DevInit.Mode != common.DevInitModeRegister { + klog.Infoln("======dev init mode is not register, will register to edgecore") + // TODO health check + if _, _, err = grpcclient.RegisterMapper(&c, false); err != nil { + klog.Fatal(err) + } + klog.Infoln("registerMapper finished") + } + go panel.DevStart() + defer grpcServer.Stop() + if err = grpcServer.Start(); err != nil { + klog.Fatal(err) + } +} diff --git a/_template/mapper-dmi/config.yaml b/_template/mapper-dmi/config.yaml new file mode 100644 index 00000000..c74bc29a --- /dev/null +++ b/_template/mapper-dmi/config.yaml @@ -0,0 +1,12 @@ +grpc_server: + socket_path: /etc/kubeedge/Template.sock +common: + name: Template-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: # TODO your protocol name + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock +dev_init: + mode: register + diff --git a/_template/mapper-dmi/device/device.go b/_template/mapper-dmi/device/device.go new file mode 100644 index 00000000..d481460a --- /dev/null +++ b/_template/mapper-dmi/device/device.go @@ -0,0 +1,344 @@ +package device + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "sync" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/config" + "github.com/kubeedge/mappers-go/mappers/Template/driver" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +type DevPanel struct { + deviceMuxs map[string]context.CancelFunc + devices map[string]*driver.CustomizedDev + models map[string]common.DeviceModel + protocols map[string]common.Protocol + wg sync.WaitGroup + serviceMutex sync.Mutex + quitChan chan os.Signal +} + +var ( + devPanel *DevPanel + once sync.Once +) + +// NewDevPanel init and return devPanel +func NewDevPanel() *DevPanel { + once.Do(func() { + devPanel = &DevPanel{ + deviceMuxs: make(map[string]context.CancelFunc), + devices: make(map[string]*driver.CustomizedDev), + models: make(map[string]common.DeviceModel), + protocols: make(map[string]common.Protocol), + wg: sync.WaitGroup{}, + serviceMutex: sync.Mutex{}, + quitChan: make(chan os.Signal), + } + }) + return devPanel +} + +// DevStart start all devices. +func (d *DevPanel) DevStart() { + for id, dev := range d.devices { + klog.V(4).Info("Dev: ", id, dev) + ctx, cancel := context.WithCancel(context.Background()) + d.deviceMuxs[id] = cancel + d.wg.Add(1) + go d.start(ctx, dev) + } + signal.Notify(d.quitChan, os.Interrupt) + go func() { + <-d.quitChan + for id, device := range d.devices { + err := device.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("Service has stopped but failed to stop %s:%v", id, err) + } + } + klog.V(1).Info("Exit mapper") + os.Exit(1) + }() + d.wg.Wait() +} + +// start the device +func (d *DevPanel) start(ctx context.Context, dev *driver.CustomizedDev) { + defer d.wg.Done() + + var protocolConfig driver.TemplateProtocolConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ProtocolConfigs, &protocolConfig); err != nil { + klog.Errorf("Unmarshal ProtocolConfigs error: %v", err) + return + } + var protocolCommonConfig driver.TemplateProtocolCommonConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ProtocolCommonConfig, &protocolCommonConfig); err != nil { + klog.Errorf("Unmarshal ProtocolCommonConfig error: %v", err) + return + } + + client, err := driver.NewClient(protocolCommonConfig, protocolConfig) + if err != nil { + klog.Errorf("Init dev %s error: %v", dev.Instance.Name, err) + return + } + dev.CustomizedClient = client + err = dev.CustomizedClient.InitDevice() + if err != nil { + klog.Errorf("Init device %s error: %v", dev.Instance.ID, err) + return + } + go initTwin(ctx, dev) + <-ctx.Done() +} + +// initTwin initialize the timer to get twin value. +func initTwin(ctx context.Context, dev *driver.CustomizedDev) { + for _, twin := range dev.Instance.Twins { + var visitorConfig driver.TemplateVisitorConfig + err := json.Unmarshal(twin.PVisitor.VisitorConfig, &visitorConfig) + if err != nil { + klog.Errorf("Unmarshal VisitorConfig error: %v", err) + continue + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + klog.Error(err) + continue + } + twinData := &TwinData{ + DeviceName: dev.Instance.Name, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.Desired.Metadatas.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID), + } + + collectCycle := time.Duration(twin.PVisitor.CollectCycle) + if collectCycle == 0 { + collectCycle = 1 * time.Second + } + ticker := time.NewTicker(collectCycle) + go func() { + for { + select { + case <-ticker.C: + twinData.Run() + case <-ctx.Done(): + return + } + } + }() + } +} + +// setVisitor check if visitor property is readonly, if not then set it. +func setVisitor(visitorConfig *driver.TemplateVisitorConfig, twin *common.Twin, dev *driver.CustomizedDev) error { + if twin.PVisitor.PProperty.AccessMode == "ReadOnly" { + klog.V(1).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName) + return nil + } + klog.V(2).Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value) + value, err := common.Convert(twin.PVisitor.PProperty.DataType, twin.Desired.Value) + if err != nil { + klog.Errorf("Failed to convert value as %s : %v", twin.PVisitor.PProperty.DataType, err) + return err + } + err = dev.CustomizedClient.SetDeviceData(value, visitorConfig) + if err != nil { + return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err) + } + return nil +} + +// DevInit initialize the device +func (d *DevPanel) DevInit(cfg *config.Config) error { + devs := make(map[string]*common.DeviceInstance) + + switch cfg.DevInit.Mode { + case common.DevInitModeConfigmap: + if err := parse.Parse(cfg.DevInit.Configmap, devs, d.models, d.protocols); err != nil { + return err + } + case common.DevInitModeRegister: + if err := parse.ParseByUsingRegister(cfg, devs, d.models, d.protocols); err != nil { + return err + } + } + + for key, deviceInstance := range devs { + cur := new(driver.CustomizedDev) + cur.Instance = *deviceInstance + d.devices[key] = cur + } + return nil +} + +// UpdateDev stop old device, then update and start new device +func (d *DevPanel) UpdateDev(model *common.DeviceModel, device *common.DeviceInstance, protocol *common.Protocol) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + + if oldDevice, ok := d.devices[device.ID]; ok { + err := d.stopDev(oldDevice, device.ID) + if err != nil { + klog.Error(err) + } + } + // start new device + d.devices[device.ID] = new(driver.CustomizedDev) + d.devices[device.ID].Instance = *device + d.models[device.ID] = *model + d.protocols[device.ID] = *protocol + + ctx, cancelFunc := context.WithCancel(context.Background()) + d.deviceMuxs[device.ID] = cancelFunc + d.wg.Add(1) + go d.start(ctx, d.devices[device.ID]) +} + +// UpdateDevTwins update device's twins +func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + dev.Instance.Twins = twins + model := d.models[dev.Instance.Model] + protocol := d.protocols[dev.Instance.ProtocolName] + d.UpdateDev(&model, &dev.Instance, &protocol) + return nil +} + +// DealDeviceTwinGet get device's twin data +func (d *DevPanel) DealDeviceTwinGet(deviceID string, twinName string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return nil, fmt.Errorf("not found device %s", deviceID) + } + var res []parse.TwinResultResponse + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + payload, err := getTwinData(deviceID, twin, d.devices[deviceID]) + if err != nil { + return nil, err + } + item := parse.TwinResultResponse{ + PropertyName: twinName, + Payload: payload, + } + res = append(res, item) + } + return json.Marshal(res) +} + +// getTwinData get twin +func getTwinData(deviceID string, twin common.Twin, dev *driver.CustomizedDev) ([]byte, error) { + var visitorConfig driver.TemplateVisitorConfig + err := json.Unmarshal(twin.PVisitor.VisitorConfig, &visitorConfig) + if err != nil { + return nil, err + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + return nil, err + } + twinData := &TwinData{ + DeviceName: deviceID, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.Desired.Metadatas.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID), + } + return twinData.GetPayLoad() +} + +// GetDevice get device instance +func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + found, ok := d.devices[deviceID] + if !ok || found == nil { + return nil, fmt.Errorf("device %s not found", deviceID) + } + + // get the latest reported twin value + for i, twin := range found.Instance.Twins { + payload, err := getTwinData(deviceID, twin, found) + if err != nil { + return nil, err + } + found.Instance.Twins[i].Reported.Value = string(payload) + } + return found, nil +} + +// RemoveDevice remove device instance +func (d *DevPanel) RemoveDevice(deviceID string) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev := d.devices[deviceID] + delete(d.devices, deviceID) + err := d.stopDev(dev, deviceID) + if err != nil { + return err + } + return nil +} + +// stopDev stop device and goroutine +func (d *DevPanel) stopDev(dev *driver.CustomizedDev, id string) error { + cancelFunc, ok := d.deviceMuxs[id] + if !ok { + return fmt.Errorf("can not find device %s from device muxs", id) + } + + err := dev.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("stop device %s error: %v", id, err) + } + cancelFunc() + return nil +} + +// GetModel if the model exists, return device model +func (d *DevPanel) GetModel(modelName string) (common.DeviceModel, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + if model, ok := d.models[modelName]; ok { + return model, nil + } + return common.DeviceModel{}, fmt.Errorf("deviceModel %s not found", modelName) +} + +// UpdateModel update device model +func (d *DevPanel) UpdateModel(model *common.DeviceModel) { + d.serviceMutex.Lock() + d.models[model.Name] = *model + d.serviceMutex.Unlock() +} + +// RemoveModel remove device model +func (d *DevPanel) RemoveModel(modelName string) { + d.serviceMutex.Lock() + delete(d.models, modelName) + d.serviceMutex.Unlock() +} diff --git a/_template/mapper-dmi/device/twindata.go b/_template/mapper-dmi/device/twindata.go new file mode 100644 index 00000000..21259f0e --- /dev/null +++ b/_template/mapper-dmi/device/twindata.go @@ -0,0 +1,82 @@ +package device + +import ( + "encoding/json" + "fmt" + "strings" + + "k8s.io/klog/v2" + + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/mappers/Template/driver" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/util/grpcclient" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +type TwinData struct { + DeviceName string + Client *driver.CustomizedClient + Name string + Type string + VisitorConfig *driver.TemplateVisitorConfig + Results interface{} + Topic string +} + +func (td *TwinData) GetPayLoad() ([]byte, error) { + var err error + td.Results, err = td.Client.GetDeviceData(td.VisitorConfig) + if err != nil { + return nil, fmt.Errorf("get device data failed: %v", err) + } + sData, err := common.ConvertToString(td.Results) + if err != nil { + klog.Errorf("Failed to convert %s %s value as string : %v", td.DeviceName, td.Name, err) + return nil, err + } + if len(sData) > 30 { + klog.V(4).Infof("Get %s : %s ,value is %s......", td.DeviceName, td.Name, sData[:30]) + } else { + klog.V(4).Infof("Get %s : %s ,value is %s", td.DeviceName, td.Name, sData) + } + var payload []byte + if strings.Contains(td.Topic, "$hw") { + if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message twin update failed: %v", err) + } + } else { + if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message data failed: %v", err) + } + } + return payload, nil +} + +func (td *TwinData) Run() { + payload, err := td.GetPayLoad() + if err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + var msg common.DeviceTwinUpdate + if err = json.Unmarshal(payload, &msg); err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + twins := parse.ConvMsgTwinToGrpc(msg.Twin) + + var rdsr = &dmiapi.ReportDeviceStatusRequest{ + DeviceName: td.DeviceName, + ReportedDevice: &dmiapi.DeviceStatus{ + Twins: twins, + State: "OK", + }, + } + + if err := grpcclient.ReportDeviceStatus(rdsr); err != nil { + klog.Errorf("fail to report device status of %s with err: %+v", rdsr.DeviceName, err) + } +} diff --git a/_template/mapper-dmi/driver/devicetype.go b/_template/mapper-dmi/driver/devicetype.go new file mode 100644 index 00000000..00fa157c --- /dev/null +++ b/_template/mapper-dmi/driver/devicetype.go @@ -0,0 +1,44 @@ +package driver + +import ( + "sync" + + "github.com/kubeedge/mappers-go/pkg/common" +) + +// CustomizedDev is the customized device configuration and client information. +type CustomizedDev struct { + Instance common.DeviceInstance + CustomizedClient *CustomizedClient +} + +type CustomizedClient struct { + deviceMutex sync.Mutex + TemplateProtocolCommonConfig + TemplateProtocolConfig +} + +type TemplateProtocolConfig struct { + ProtocolName string `json:"protocolName"` + ProtocolConfigData `json:"configData"` +} + +type ProtocolConfigData struct { + // TODO: add your config data according to configmap +} + +type TemplateProtocolCommonConfig struct { + CommonCustomizedValues `json:"customizedValues"` +} + +type CommonCustomizedValues struct { + // TODO: add your CommonCustomizedValues according to configmap +} +type TemplateVisitorConfig struct { + ProtocolName string `json:"protocolName"` + VisitorConfigData `json:"configData"` +} + +type VisitorConfigData struct { + // TODO: add your Visitor ConfigData according to configmap +} diff --git a/_template/mapper-dmi/driver/driver.go b/_template/mapper-dmi/driver/driver.go new file mode 100644 index 00000000..42b2f892 --- /dev/null +++ b/_template/mapper-dmi/driver/driver.go @@ -0,0 +1,39 @@ +package driver + +import ( + "sync" +) + +func NewClient(commonProtocol TemplateProtocolCommonConfig, + protocol TemplateProtocolConfig) (*CustomizedClient, error) { + client := &CustomizedClient{ + TemplateProtocolCommonConfig: commonProtocol, + TemplateProtocolConfig: protocol, + deviceMutex: sync.Mutex{}, + } + return client, nil +} + +func (c *CustomizedClient) InitDevice() error { + // TODO: add init operation + // you can use c.TemplateProtocolConfig and c.TemplateProtocolCommonConfig + return nil +} + +func (c *CustomizedClient) GetDeviceData(visitor *TemplateVisitorConfig) (interface{}, error) { + // TODO: get device's data + // you can use c.TemplateProtocolConfig,c.TemplateProtocolCommonConfig and visitor + return nil, nil +} + +func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *TemplateVisitorConfig) error { + // TODO: set device's data + // you can use c.TemplateProtocolConfig,c.TemplateProtocolCommonConfig and visitor + return nil +} + +func (c *CustomizedClient) StopDevice() error { + // TODO: stop device + // you can use c.TemplateProtocolConfig and c.TemplateProtocolCommonConfig + return nil +} diff --git a/_template/mapper-dmi/hack/make-rules/mapper.sh b/_template/mapper-dmi/hack/make-rules/mapper.sh new file mode 100644 index 00000000..74715aa0 --- /dev/null +++ b/_template/mapper-dmi/hack/make-rules/mapper.sh @@ -0,0 +1,190 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +ROOT_DIR="$(cd "${CURR_DIR}/../.." && pwd -P)" +source "${ROOT_DIR}/hack/lib/init.sh" + +mkdir -p "${CURR_DIR}/bin" +mkdir -p "${CURR_DIR}/dist" + +function mod() { + [[ "${2:-}" != "only" ]] + local mapper="${1}" + + # the mapper is sharing the vendor with root + pushd "${ROOT_DIR}" >/dev/null || exist 1 + echo "downloading dependencies for mapper ${mapper}..." + + if [[ "$(go env GO111MODULE)" == "off" ]]; then + echo "go mod has been disabled by GO111MODULE=off" + else + echo "tidying" + go mod tidy + echo "vending" + go mod vendor + fi + + echo "...done" + popd >/dev/null || return +} + +function lint() { + [[ "${2:-}" != "only" ]] && mod "$@" + local mapper="${1}" + + echo "fmt and linting mapper ${mapper}..." + + gofmt -s -w "${CURR_DIR}/" + golangci-lint run "${CURR_DIR}/..." + + echo "...done" +} + +function build() { + [[ "${2:-}" != "only" ]] && lint "$@" + local mapper="${1}" + + local flags=" -w -s " + local ext_flags=" -extldflags '-static' " + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + echo "building ${platform}" + + local os_arch + IFS="/" read -r -a os_arch <<<"${platform}" + local os=${os_arch[0]} + local arch=${os_arch[1]} + GOOS=${os} GOARCH=${arch} CGO_ENABLED=0 go build \ + -ldflags "${flags} ${ext_flags}" \ + -o "${CURR_DIR}/bin/${mapper}_${os}_${arch}" \ + "${CURR_DIR}/cmd/main.go" + + cp ${CURR_DIR}/bin/${mapper}_${os}_${arch} ${CURR_DIR}/bin/${mapper} + echo "...done" +} + +function package() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "packaging mapper ${mapper}..." + + local image_name="${mapper}-mapper" + local tag=v1.0 + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + pushd "${CURR_DIR}" >/dev/null 2>&1 + if [[ "${platform}" =~ darwin/* ]]; then + echo "package into Darwin OS image is unavailable, please use CROSS=true env to containerize multiple arch images or use OS=linux ARCH=amd64 env to containerize linux/amd64 image" + fi + + local image_tag="${image_name}:${tag}-${platform////-}" + echo "packaging ${image_tag}" + sudo docker build \ + --platform "${platform}" \ + -t "${image_tag}" . + popd >/dev/null 2>&1 + + echo "...done" +} + +function test() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "running unit tests for mapper ${mapper}..." + + local unit_test_targets=( + "${CURR_DIR}/config/..." + "${CURR_DIR}/configmap/..." + "${CURR_DIR}/device/..." + "${CURR_DIR}/driver/..." + ) + + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + if [[ "${arch}" == "arm" ]]; then + # NB(thxCode): race detector doesn't support `arm` arch, ref to: + # - https://golang.org/doc/articles/race_detector.html#Supported_Systems + GOOS=${os} GOARCH=${arch} CGO_ENABLED=1 go test \ + -cover -coverprofile "${CURR_DIR}/dist/coverage_${mapper}_${os}_${arch}.out" \ + "${unit_test_targets[@]}" + else + GOOS=${os} GOARCH=${arch} CGO_ENABLED=1 go test \ + -race \ + -cover -coverprofile "${CURR_DIR}/dist/coverage_${mapper}_${os}_${arch}.out" \ + "${unit_test_targets[@]}" + fi + + echo "...done" +} + +function clean() { + local mapper="${1}" + + echo "cleanup mapper ${mapper}..." + + rm -rf "${CURR_DIR}/bin/*" + + echo "...done" +} + +function entry() { + local mapper="${1:-}" + shift 1 + + local stages="${1:-build}" + shift $(($# > 0 ? 1 : 0)) + + IFS="," read -r -a stages <<<"${stages}" + local commands=$* + if [[ ${#stages[@]} -ne 1 ]]; then + commands="only" + fi + + for stage in "${stages[@]}"; do + echo "# make mapper ${mapper} ${stage} ${commands}" + case ${stage} in + m | mod) mod "${mapper}" "${commands}" ;; + l | lint) lint "${mapper}" "${commands}" ;; + b | build) build "${mapper}" "${commands}" ;; + p | pkg | package) package "${mapper}" "${commands}" ;; + t | test) test "${mapper}" "${commands}" ;; + c | clean) clean "${mapper}" "${commands}" ;; + *) echo "unknown action '${stage}', select from mod,lint,build,test,clean" ;; + esac + done +} + +echo $@ +entry "$@" diff --git a/config/config.go b/config/config.go index 8086ba38..6b00beb2 100644 --- a/config/config.go +++ b/config/config.go @@ -67,7 +67,7 @@ func (c *Config) Parse() error { pflag.StringVar(&loglevel, "v", "1", "log level") pflag.StringVar(&configFile, "config-file", defaultConfigFile, "Config file name") - + pflag.Parse() cf, err := ioutil.ReadFile(configFile) if err != nil { return err @@ -81,13 +81,11 @@ func (c *Config) Parse() error { switch c.DevInit.Mode { case common.DevInitModeConfigmap: - if readFile, err := ioutil.ReadFile(c.DevInit.Configmap); err != nil { + if _, err := ioutil.ReadFile(c.DevInit.Configmap); err != nil { if !os.IsNotExist(err) { return err } c.DevInit.Configmap = strings.TrimSpace(os.Getenv("DEVICE_PROFILE")) - } else { - c.DevInit.Configmap = string(readFile) } if strings.TrimSpace(c.DevInit.Configmap) == "" { return errors.New("can not parse configmap") diff --git a/go.mod b/go.mod index f9411449..0c29d01b 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,11 @@ require ( github.com/beevik/etree v1.1.0 github.com/currantlabs/ble v0.0.0-20171229162446-c1d21c164cf8 github.com/eclipse/paho.mqtt.golang v1.3.0 + github.com/fatih/structs v1.1.0 github.com/go-resty/resty/v2 v2.7.0 github.com/goburrow/modbus v0.1.0 github.com/goburrow/serial v0.1.0 + github.com/golang/protobuf v1.5.2 github.com/gopcua/opcua v0.1.13 github.com/gorilla/mux v1.8.0 github.com/kubeedge/kubeedge v1.12.0-beta.0 @@ -24,6 +26,7 @@ require ( github.com/use-go/onvif v0.0.1 golang.org/x/net v0.0.0-20220225172249-27dd8689420f google.golang.org/grpc v1.47.0 + google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.24.1 k8s.io/apimachinery v0.24.1 @@ -45,7 +48,6 @@ require ( github.com/go-openapi/swag v0.19.14 // indirect github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect @@ -73,7 +75,6 @@ require ( golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index ecfb1768..c2e2ca88 100644 --- a/go.sum +++ b/go.sum @@ -409,6 +409,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4= github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= +github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= diff --git a/hack/make-rules/dmitemplate.sh b/hack/make-rules/dmitemplate.sh new file mode 100755 index 00000000..53d959fd --- /dev/null +++ b/hack/make-rules/dmitemplate.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +# The root of the octopus directory +ROOT_DIR="${CURR_DIR}" + +function entry() { + # copy template + read -p "Please input the mapper name (like 'Bluetooth', 'BLE'): " -r mapperName + if [[ -z "${mapperName}" ]]; then + echo "the mapper name is required" + exit 1 + fi + mapperNameLowercase=$(echo -n "${mapperName}" | tr '[:upper:]' '[:lower:]') + mapperPath="${ROOT_DIR}/mappers/${mapperNameLowercase}" + if [[ -d "${mapperPath}" ]]; then + echo "the directory is existed" + exit 1 + fi + cp -r "${ROOT_DIR}/_template/mapper-dmi" "${mapperPath}" + + mapperVar=$(echo "${mapperName}" | sed -e "s/\b\(.\)/\\u\1/g") + sed -i "s/Template/${mapperVar}/g" `grep Template -rl ${mapperPath}` + sed -i "s/mappers\/${mapperVar}/mappers\/${mapperNameLowercase}/g" `grep "mappers\/${mapperVar}" -rl ${ROOT_DIR}/mappers/${mapperNameLowercase}` + sed -i "s/${mapperVar}/${mapperNameLowercase}/g" ${mapperPath}/Dockerfile + # gofmt + go fmt "${mapperPath}/..." >/dev/null 2>&1 + echo "You can find your customized mapper in mappers " +} + +entry "$@" \ No newline at end of file diff --git a/mappers/modbus-dmi/cmd/main.go b/mappers/modbus-dmi/cmd/main.go index b269c5c5..aea27fb9 100644 --- a/mappers/modbus-dmi/cmd/main.go +++ b/mappers/modbus-dmi/cmd/main.go @@ -48,6 +48,7 @@ func main() { SockPath: c.GrpcServer.SocketPath, Protocol: common.ProtocolModbus, }, + device.NewDevPanel(), ) panel := device.NewDevPanel() diff --git a/mappers/virtualdevice-dmi/Dockerfile b/mappers/virtualdevice-dmi/Dockerfile new file mode 100644 index 00000000..bcb43c60 --- /dev/null +++ b/mappers/virtualdevice-dmi/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:16.04 + +RUN mkdir -p kubeedge + +COPY ./bin/virtualdevice-dmi kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge + +CMD ./virtualdevice-dmi diff --git a/mappers/virtualdevice-dmi/Makefile b/mappers/virtualdevice-dmi/Makefile new file mode 100644 index 00000000..710076bf --- /dev/null +++ b/mappers/virtualdevice-dmi/Makefile @@ -0,0 +1,36 @@ +SHELL := /bin/bash + +curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST))))) +rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS)) +$(eval $(rest_args):;@:) + +help: + # + # Usage: + # make template : create a mapper based on a template. + # make mapper {mapper-name} : execute mapper building process. + # make all : execute building process to all mappers. + # + # Actions: + # - mod, m : download code dependencies. + # - lint, l : verify code via go fmt and `golangci-lint`. + # - build, b : compile code. + # - package, p : package docker image. + # - test, t : run unit tests. + # - clean, c : clean output binary. + # + # Parameters: + # ARM : true or undefined + # ARM64 : true or undefined + # + # Example: + # - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64. + # - make mapper modbus test : execute `test` "modbus" mapper. + @echo + +make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g') +$(make_rules): + @$(curr_dir)/hack/make-rules/$@.sh $(rest_args) + +.DEFAULT_GOAL := help +.PHONY: $(make_rules) build test package \ No newline at end of file diff --git a/mappers/virtualdevice-dmi/README.md b/mappers/virtualdevice-dmi/README.md new file mode 100644 index 00000000..9c0b837b --- /dev/null +++ b/mappers/virtualdevice-dmi/README.md @@ -0,0 +1,27 @@ +# MapperDMI Example +Use a sample device to guide you to use mapper by dmi + +## Function introduction +Generate random numbers and report them to the cloud, and control +the range of generating numbers according to the value returned by the cloud + +## How to run this code + +### Start Mapper +Ensure that the running node of the code is in nodeSelector's list +```shell +cd cmd +``` +```shell +go run main.go --v 4 --config-file=../config.yaml +``` + +### Add Device +1. copy ```resource/random-device-model.yaml``` and ```resource/random-device-instance.yaml``` to cloud node +2. ```kubectl apply -f random-device-model.yaml``` +3. Modify the nodeSelector in ```random-device-instance.yaml```, make the value of the field the name of your edge node. (You can get the edge node name by ```kubectl get nodes```) +4. ```kubectl apply -f random-device-instance.yaml``` +5. Ensure that the device has been added correctly: ```kubectl get device``` + +### Warning +This demo is for testing purposes only. If you want to deploy in a production environment, please use container deployment. And you can get more details by ```make```. \ No newline at end of file diff --git a/mappers/virtualdevice-dmi/cmd/main.go b/mappers/virtualdevice-dmi/cmd/main.go new file mode 100644 index 00000000..20faeeb1 --- /dev/null +++ b/mappers/virtualdevice-dmi/cmd/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "errors" + "os" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/config" + "github.com/kubeedge/mappers-go/mappers/virtualdevice-dmi/device" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/grpcserver" + "github.com/kubeedge/mappers-go/pkg/util/grpcclient" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +func main() { + var err error + var c config.Config + + klog.InitFlags(nil) + defer klog.Flush() + + if err = c.Parse(); err != nil { + klog.Fatal(err) + os.Exit(1) + } + klog.Infof("config: %+v", c) + + grpcclient.Init(&c) + + // start grpc server + grpcServer := grpcserver.NewServer( + grpcserver.Config{ + SockPath: c.GrpcServer.SocketPath, + Protocol: common.ProtocolCustomized, + }, + device.NewDevPanel(), + ) + + panel := device.NewDevPanel() + err = panel.DevInit(&c) + if err != nil && !errors.Is(err, parse.ErrEmptyData) { + klog.Fatal(err) + } + klog.Infoln("devInit finished") + + // register to edgecore + // if dev init mode is register, mapper's dev will init when registry to edgecore + if c.DevInit.Mode != common.DevInitModeRegister { + klog.Infoln("======dev init mode is not register, will register to edgecore") + // TODO health check + if _, _, err = grpcclient.RegisterMapper(&c, false); err != nil { + klog.Fatal(err) + } + klog.Infoln("registerMapper finished") + } + go panel.DevStart() + defer grpcServer.Stop() + if err = grpcServer.Start(); err != nil { + klog.Fatal(err) + } +} diff --git a/mappers/virtualdevice-dmi/config.yaml b/mappers/virtualdevice-dmi/config.yaml new file mode 100644 index 00000000..898c11d6 --- /dev/null +++ b/mappers/virtualdevice-dmi/config.yaml @@ -0,0 +1,13 @@ +grpc_server: + socket_path: /etc/kubeedge/virtualdevice.sock +common: + name: test-dmi-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: virtualProtocol + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock +dev_init: + mode: register #register/configmap + configmap: ../test.json + diff --git a/mappers/virtualdevice-dmi/device/device.go b/mappers/virtualdevice-dmi/device/device.go new file mode 100644 index 00000000..aae07136 --- /dev/null +++ b/mappers/virtualdevice-dmi/device/device.go @@ -0,0 +1,344 @@ +package device + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "sync" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mappers-go/config" + "github.com/kubeedge/mappers-go/mappers/virtualdevice-dmi/driver" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +type DevPanel struct { + deviceMuxs map[string]context.CancelFunc + devices map[string]*driver.CustomizedDev + models map[string]common.DeviceModel + protocols map[string]common.Protocol + wg sync.WaitGroup + serviceMutex sync.Mutex + quitChan chan os.Signal +} + +var ( + devPanel *DevPanel + once sync.Once +) + +// NewDevPanel init and return devPanel +func NewDevPanel() *DevPanel { + once.Do(func() { + devPanel = &DevPanel{ + deviceMuxs: make(map[string]context.CancelFunc), + devices: make(map[string]*driver.CustomizedDev), + models: make(map[string]common.DeviceModel), + protocols: make(map[string]common.Protocol), + wg: sync.WaitGroup{}, + serviceMutex: sync.Mutex{}, + quitChan: make(chan os.Signal), + } + }) + return devPanel +} + +// DevStart start all devices. +func (d *DevPanel) DevStart() { + for id, dev := range d.devices { + klog.V(4).Info("Dev: ", id, dev) + ctx, cancel := context.WithCancel(context.Background()) + d.deviceMuxs[id] = cancel + d.wg.Add(1) + go d.start(ctx, dev) + } + signal.Notify(d.quitChan, os.Interrupt) + go func() { + <-d.quitChan + for id, device := range d.devices { + err := device.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("Service has stopped but failed to stop %s:%v", id, err) + } + } + klog.V(1).Info("Exit mapper") + os.Exit(1) + }() + d.wg.Wait() +} + +// start the device +func (d *DevPanel) start(ctx context.Context, dev *driver.CustomizedDev) { + defer d.wg.Done() + + var protocolConfig driver.CustomizedDeviceProtocolConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ProtocolConfigs, &protocolConfig); err != nil { + klog.Errorf("Unmarshal ProtocolConfigs error: %v", err) + return + } + var protocolCommonConfig driver.CustomizedDeviceProtocolCommonConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ProtocolCommonConfig, &protocolCommonConfig); err != nil { + klog.Errorf("Unmarshal ProtocolCommonConfig error: %v", err) + return + } + + client, err := driver.NewClient(protocolCommonConfig, protocolConfig) + if err != nil { + klog.Errorf("Init dev %s error: %v", dev.Instance.Name, err) + return + } + dev.CustomizedClient = client + err = dev.CustomizedClient.InitDevice() + if err != nil { + klog.Errorf("Init device %s error: %v", dev.Instance.ID, err) + return + } + go initTwin(ctx, dev) + <-ctx.Done() +} + +// initTwin initialize the timer to get twin value. +func initTwin(ctx context.Context, dev *driver.CustomizedDev) { + for _, twin := range dev.Instance.Twins { + var visitorConfig driver.CustomizedDeviceVisitorConfig + err := json.Unmarshal(twin.PVisitor.VisitorConfig, &visitorConfig) + if err != nil { + klog.Errorf("Unmarshal VisitorConfig error: %v", err) + continue + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + klog.Error(err) + continue + } + twinData := &TwinData{ + DeviceName: dev.Instance.Name, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.Desired.Metadatas.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID), + } + + collectCycle := time.Duration(twin.PVisitor.CollectCycle) + if collectCycle == 0 { + collectCycle = 1 * time.Second + } + ticker := time.NewTicker(collectCycle) + go func() { + for { + select { + case <-ticker.C: + twinData.Run() + case <-ctx.Done(): + return + } + } + }() + } +} + +// setVisitor check if visitor property is readonly, if not then set it. +func setVisitor(visitorConfig *driver.CustomizedDeviceVisitorConfig, twin *common.Twin, dev *driver.CustomizedDev) error { + if twin.PVisitor.PProperty.AccessMode == "ReadOnly" { + klog.V(1).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName) + return nil + } + klog.V(2).Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value) + value, err := common.Convert(twin.PVisitor.PProperty.DataType, twin.Desired.Value) + if err != nil { + klog.Errorf("Failed to convert value as %s : %v", twin.PVisitor.PProperty.DataType, err) + return err + } + err = dev.CustomizedClient.SetDeviceData(value, visitorConfig) + if err != nil { + return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err) + } + return nil +} + +// DevInit initialize the device +func (d *DevPanel) DevInit(cfg *config.Config) error { + devs := make(map[string]*common.DeviceInstance) + + switch cfg.DevInit.Mode { + case common.DevInitModeConfigmap: + if err := parse.Parse(cfg.DevInit.Configmap, devs, d.models, d.protocols); err != nil { + return err + } + case common.DevInitModeRegister: + if err := parse.ParseByUsingRegister(cfg, devs, d.models, d.protocols); err != nil { + return err + } + } + + for key, deviceInstance := range devs { + cur := new(driver.CustomizedDev) + cur.Instance = *deviceInstance + d.devices[key] = cur + } + return nil +} + +// UpdateDev stop old device, then update and start new device +func (d *DevPanel) UpdateDev(model *common.DeviceModel, device *common.DeviceInstance, protocol *common.Protocol) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + + if oldDevice, ok := d.devices[device.ID]; ok { + err := d.stopDev(oldDevice, device.ID) + if err != nil { + klog.Error(err) + } + } + // start new device + d.devices[device.ID] = new(driver.CustomizedDev) + d.devices[device.ID].Instance = *device + d.models[device.ID] = *model + d.protocols[device.ID] = *protocol + + ctx, cancelFunc := context.WithCancel(context.Background()) + d.deviceMuxs[device.ID] = cancelFunc + d.wg.Add(1) + go d.start(ctx, d.devices[device.ID]) +} + +// UpdateDevTwins update device's twins +func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + dev.Instance.Twins = twins + model := d.models[dev.Instance.Model] + protocol := d.protocols[dev.Instance.ProtocolName] + d.UpdateDev(&model, &dev.Instance, &protocol) + return nil +} + +// DealDeviceTwinGet get device's twin data +func (d *DevPanel) DealDeviceTwinGet(deviceID string, twinName string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return nil, fmt.Errorf("not found device %s", deviceID) + } + var res []parse.TwinResultResponse + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + payload, err := getTwinData(deviceID, twin, d.devices[deviceID]) + if err != nil { + return nil, err + } + item := parse.TwinResultResponse{ + PropertyName: twinName, + Payload: payload, + } + res = append(res, item) + } + return json.Marshal(res) +} + +// getTwinData get twin +func getTwinData(deviceID string, twin common.Twin, dev *driver.CustomizedDev) ([]byte, error) { + var visitorConfig driver.CustomizedDeviceVisitorConfig + err := json.Unmarshal(twin.PVisitor.VisitorConfig, &visitorConfig) + if err != nil { + return nil, err + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + return nil, err + } + twinData := &TwinData{ + DeviceName: deviceID, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.Desired.Metadatas.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID), + } + return twinData.GetPayLoad() +} + +// GetDevice get device instance +func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + found, ok := d.devices[deviceID] + if !ok || found == nil { + return nil, fmt.Errorf("device %s not found", deviceID) + } + + // get the latest reported twin value + for i, twin := range found.Instance.Twins { + payload, err := getTwinData(deviceID, twin, found) + if err != nil { + return nil, err + } + found.Instance.Twins[i].Reported.Value = string(payload) + } + return found, nil +} + +// RemoveDevice remove device instance +func (d *DevPanel) RemoveDevice(deviceID string) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev := d.devices[deviceID] + delete(d.devices, deviceID) + err := d.stopDev(dev, deviceID) + if err != nil { + return err + } + return nil +} + +// stopDev stop device and goroutine +func (d *DevPanel) stopDev(dev *driver.CustomizedDev, id string) error { + cancelFunc, ok := d.deviceMuxs[id] + if !ok { + return fmt.Errorf("can not find device %s from device muxs", id) + } + + err := dev.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("stop device %s error: %v", id, err) + } + cancelFunc() + return nil +} + +// GetModel if the model exists, return device model +func (d *DevPanel) GetModel(modelName string) (common.DeviceModel, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + if model, ok := d.models[modelName]; ok { + return model, nil + } + return common.DeviceModel{}, fmt.Errorf("deviceModel %s not found", modelName) +} + +// UpdateModel update device model +func (d *DevPanel) UpdateModel(model *common.DeviceModel) { + d.serviceMutex.Lock() + d.models[model.Name] = *model + d.serviceMutex.Unlock() +} + +// RemoveModel remove device model +func (d *DevPanel) RemoveModel(modelName string) { + d.serviceMutex.Lock() + delete(d.models, modelName) + d.serviceMutex.Unlock() +} diff --git a/mappers/virtualdevice-dmi/device/twindata.go b/mappers/virtualdevice-dmi/device/twindata.go new file mode 100644 index 00000000..6b32af2b --- /dev/null +++ b/mappers/virtualdevice-dmi/device/twindata.go @@ -0,0 +1,82 @@ +package device + +import ( + "encoding/json" + "fmt" + "strings" + + "k8s.io/klog/v2" + + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" + "github.com/kubeedge/mappers-go/mappers/virtualdevice-dmi/driver" + "github.com/kubeedge/mappers-go/pkg/common" + "github.com/kubeedge/mappers-go/pkg/util/grpcclient" + "github.com/kubeedge/mappers-go/pkg/util/parse" +) + +type TwinData struct { + DeviceName string + Client *driver.CustomizedClient + Name string + Type string + VisitorConfig *driver.CustomizedDeviceVisitorConfig + Results interface{} + Topic string +} + +func (td *TwinData) GetPayLoad() ([]byte, error) { + var err error + td.Results, err = td.Client.GetDeviceData(td.VisitorConfig) + if err != nil { + return nil, fmt.Errorf("get device data failed: %v", err) + } + sData, err := common.ConvertToString(td.Results) + if err != nil { + klog.Errorf("Failed to convert %s %s value as string : %v", td.DeviceName, td.Name, err) + return nil, err + } + if len(sData) > 30 { + klog.V(4).Infof("Get %s : %s ,value is %s......", td.DeviceName, td.Name, sData[:30]) + } else { + klog.V(4).Infof("Get %s : %s ,value is %s", td.DeviceName, td.Name, sData) + } + var payload []byte + if strings.Contains(td.Topic, "$hw") { + if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message twin update failed: %v", err) + } + } else { + if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message data failed: %v", err) + } + } + return payload, nil +} + +func (td *TwinData) Run() { + payload, err := td.GetPayLoad() + if err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + var msg common.DeviceTwinUpdate + if err = json.Unmarshal(payload, &msg); err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + twins := parse.ConvMsgTwinToGrpc(msg.Twin) + + var rdsr = &dmiapi.ReportDeviceStatusRequest{ + DeviceName: td.DeviceName, + ReportedDevice: &dmiapi.DeviceStatus{ + Twins: twins, + State: "OK", + }, + } + + if err := grpcclient.ReportDeviceStatus(rdsr); err != nil { + klog.Errorf("fail to report device status of %s with err: %+v", rdsr.DeviceName, err) + } +} diff --git a/mappers/virtualdevice-dmi/driver/devicetype.go b/mappers/virtualdevice-dmi/driver/devicetype.go new file mode 100644 index 00000000..705cd629 --- /dev/null +++ b/mappers/virtualdevice-dmi/driver/devicetype.go @@ -0,0 +1,55 @@ +package driver + +import ( + "sync" + + "github.com/kubeedge/mappers-go/pkg/common" +) + +// CustomizedDev is the customized device configuration and client information. +type CustomizedDev struct { + Instance common.DeviceInstance + CustomizedClient *CustomizedClient +} + +type CustomizedClient struct { + intMaxValue int + deviceMutex sync.Mutex + CustomizedDeviceProtocolCommonConfig + CustomizedDeviceProtocolConfig +} + +type CustomizedDeviceProtocolConfig struct { + ProtocolName string `json:"protocolName"` + ProtocolConfigData `json:"configData"` +} + +type ProtocolConfigData struct { + DeviceID int `json:"deviceID,omitempty"` +} + +type CustomizedDeviceProtocolCommonConfig struct { + Com `json:"com"` + CommonCustomizedValues `json:"customizedValues"` +} + +type Com struct { + SerialPort string `json:"serialPort"` + DataBits int `json:"dataBits"` + BaudRate int `json:"baudRate"` + Parity string `json:"parity"` + StopBits int `json:"stopBits"` +} + +type CommonCustomizedValues struct { + ProtocolID int `json:"protocolID"` +} + +type CustomizedDeviceVisitorConfig struct { + ProtocolName string `json:"protocolName"` + VisitorConfigData `json:"configData"` +} + +type VisitorConfigData struct { + DataType string `json:"dataType"` +} diff --git a/mappers/virtualdevice-dmi/driver/driver.go b/mappers/virtualdevice-dmi/driver/driver.go new file mode 100644 index 00000000..c050f680 --- /dev/null +++ b/mappers/virtualdevice-dmi/driver/driver.go @@ -0,0 +1,57 @@ +package driver + +import ( + "fmt" + "math/rand" + "sync" + + "k8s.io/klog/v2" +) + +func NewClient(commonProtocol CustomizedDeviceProtocolCommonConfig, + protocol CustomizedDeviceProtocolConfig) (*CustomizedClient, error) { + client := &CustomizedClient{ + CustomizedDeviceProtocolCommonConfig: commonProtocol, + CustomizedDeviceProtocolConfig: protocol, + deviceMutex: sync.Mutex{}, + } + return client, nil +} + +func (c *CustomizedClient) InitDevice() error { + // If your devices need to be initialized, do it here. + klog.Infof("Init device%d successful, protocolID: %v", c.DeviceID, c.ProtocolID) + klog.Infof("I can get Info: %v %v ", c.Com.SerialPort, c.Com.BaudRate) + return nil +} + +func (c *CustomizedClient) GetDeviceData(visitor *CustomizedDeviceVisitorConfig) (interface{}, error) { + if visitor.DataType == "int" { + if c.intMaxValue <= 0 { + return nil, fmt.Errorf("max value is %d, should > 0", c.intMaxValue) + } + return rand.Intn(c.intMaxValue), nil + } else if visitor.DataType == "float" { + return rand.Float64(), nil + } else { + return nil, fmt.Errorf("unrecognized data type: %s", visitor.DataType) + } +} + +func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *CustomizedDeviceVisitorConfig) error { + if visitor.DataType == "int" { + c.intMaxValue = int(data.(int64)) + } else { + return fmt.Errorf("unrecognized data type: %s", visitor.DataType) + } + return nil +} + +func (c *CustomizedClient) StopDevice() error { + klog.Infof("Stop device%d successful", c.DeviceID) + return nil +} + +func (c *CustomizedClient) GetDeviceStatus() { + // TODO health check +} diff --git a/mappers/virtualdevice-dmi/hack/make-rules/mapper.sh b/mappers/virtualdevice-dmi/hack/make-rules/mapper.sh new file mode 100755 index 00000000..74715aa0 --- /dev/null +++ b/mappers/virtualdevice-dmi/hack/make-rules/mapper.sh @@ -0,0 +1,190 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +ROOT_DIR="$(cd "${CURR_DIR}/../.." && pwd -P)" +source "${ROOT_DIR}/hack/lib/init.sh" + +mkdir -p "${CURR_DIR}/bin" +mkdir -p "${CURR_DIR}/dist" + +function mod() { + [[ "${2:-}" != "only" ]] + local mapper="${1}" + + # the mapper is sharing the vendor with root + pushd "${ROOT_DIR}" >/dev/null || exist 1 + echo "downloading dependencies for mapper ${mapper}..." + + if [[ "$(go env GO111MODULE)" == "off" ]]; then + echo "go mod has been disabled by GO111MODULE=off" + else + echo "tidying" + go mod tidy + echo "vending" + go mod vendor + fi + + echo "...done" + popd >/dev/null || return +} + +function lint() { + [[ "${2:-}" != "only" ]] && mod "$@" + local mapper="${1}" + + echo "fmt and linting mapper ${mapper}..." + + gofmt -s -w "${CURR_DIR}/" + golangci-lint run "${CURR_DIR}/..." + + echo "...done" +} + +function build() { + [[ "${2:-}" != "only" ]] && lint "$@" + local mapper="${1}" + + local flags=" -w -s " + local ext_flags=" -extldflags '-static' " + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + echo "building ${platform}" + + local os_arch + IFS="/" read -r -a os_arch <<<"${platform}" + local os=${os_arch[0]} + local arch=${os_arch[1]} + GOOS=${os} GOARCH=${arch} CGO_ENABLED=0 go build \ + -ldflags "${flags} ${ext_flags}" \ + -o "${CURR_DIR}/bin/${mapper}_${os}_${arch}" \ + "${CURR_DIR}/cmd/main.go" + + cp ${CURR_DIR}/bin/${mapper}_${os}_${arch} ${CURR_DIR}/bin/${mapper} + echo "...done" +} + +function package() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "packaging mapper ${mapper}..." + + local image_name="${mapper}-mapper" + local tag=v1.0 + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + pushd "${CURR_DIR}" >/dev/null 2>&1 + if [[ "${platform}" =~ darwin/* ]]; then + echo "package into Darwin OS image is unavailable, please use CROSS=true env to containerize multiple arch images or use OS=linux ARCH=amd64 env to containerize linux/amd64 image" + fi + + local image_tag="${image_name}:${tag}-${platform////-}" + echo "packaging ${image_tag}" + sudo docker build \ + --platform "${platform}" \ + -t "${image_tag}" . + popd >/dev/null 2>&1 + + echo "...done" +} + +function test() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "running unit tests for mapper ${mapper}..." + + local unit_test_targets=( + "${CURR_DIR}/config/..." + "${CURR_DIR}/configmap/..." + "${CURR_DIR}/device/..." + "${CURR_DIR}/driver/..." + ) + + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + if [[ "${arch}" == "arm" ]]; then + # NB(thxCode): race detector doesn't support `arm` arch, ref to: + # - https://golang.org/doc/articles/race_detector.html#Supported_Systems + GOOS=${os} GOARCH=${arch} CGO_ENABLED=1 go test \ + -cover -coverprofile "${CURR_DIR}/dist/coverage_${mapper}_${os}_${arch}.out" \ + "${unit_test_targets[@]}" + else + GOOS=${os} GOARCH=${arch} CGO_ENABLED=1 go test \ + -race \ + -cover -coverprofile "${CURR_DIR}/dist/coverage_${mapper}_${os}_${arch}.out" \ + "${unit_test_targets[@]}" + fi + + echo "...done" +} + +function clean() { + local mapper="${1}" + + echo "cleanup mapper ${mapper}..." + + rm -rf "${CURR_DIR}/bin/*" + + echo "...done" +} + +function entry() { + local mapper="${1:-}" + shift 1 + + local stages="${1:-build}" + shift $(($# > 0 ? 1 : 0)) + + IFS="," read -r -a stages <<<"${stages}" + local commands=$* + if [[ ${#stages[@]} -ne 1 ]]; then + commands="only" + fi + + for stage in "${stages[@]}"; do + echo "# make mapper ${mapper} ${stage} ${commands}" + case ${stage} in + m | mod) mod "${mapper}" "${commands}" ;; + l | lint) lint "${mapper}" "${commands}" ;; + b | build) build "${mapper}" "${commands}" ;; + p | pkg | package) package "${mapper}" "${commands}" ;; + t | test) test "${mapper}" "${commands}" ;; + c | clean) clean "${mapper}" "${commands}" ;; + *) echo "unknown action '${stage}', select from mod,lint,build,test,clean" ;; + esac + done +} + +echo $@ +entry "$@" diff --git a/mappers/virtualdevice-dmi/resource/random-device-instance.yaml b/mappers/virtualdevice-dmi/resource/random-device-instance.yaml new file mode 100644 index 00000000..5ea29208 --- /dev/null +++ b/mappers/virtualdevice-dmi/resource/random-device-instance.yaml @@ -0,0 +1,65 @@ +apiVersion: devices.kubeedge.io/v1alpha2 +kind: Device +metadata: + name: random-instance-01 + labels: + model: random-01 +spec: + deviceModelRef: + name: random-01 + protocol: + customizedProtocol: + protocolName: virtualProtocol + configData: + deviceID: 2 + common: + com: + serialPort: '/dev/ttyS0' + baudRate: 9600 + dataBits: 8 + parity: even + stopBits: 1 + customizedValues: + protocolID: 1 + nodeSelector: + nodeSelectorTerms: + - matchExpressions: + - key: '' + operator: In + values: + - edge-node + propertyVisitors: + - propertyName: random-int + customizedProtocol: + protocolName: virtualProtocol + configData: + dataType: int + - propertyName: random-float + customizedProtocol: + protocolName: virtualProtocol + configData: + dataType: float +status: + twins: + - propertyName: random-int + reported: + metadata: + timestamp: '1550049403598' + type: integer + value: "100" + desired: + metadata: + timestamp: '1550049403598' + type: integer + value: "100" + - propertyName: random-float + reported: + metadata: + timestamp: '1550049403598' + type: float + value: "30" + desired: + metadata: + timestamp: '1550049403598' + type: float + value: "30" diff --git a/mappers/virtualdevice-dmi/resource/random-device-model.yaml b/mappers/virtualdevice-dmi/resource/random-device-model.yaml new file mode 100644 index 00000000..37fa275d --- /dev/null +++ b/mappers/virtualdevice-dmi/resource/random-device-model.yaml @@ -0,0 +1,20 @@ +apiVersion: devices.kubeedge.io/v1alpha2 +kind: DeviceModel +metadata: + name: random-01 + namespace: default +spec: + protocol: virtualProtocol + properties: + - name: random-int + description: random int + type: + int: + accessMode: ReadWrite + defaultValue: 100 + - name: random-float + description: random float + type: + float: + accessMode: ReadOnly + defaultValue: 30 diff --git a/pkg/common/data_converter.go b/pkg/common/data_converter.go index 014be729..23bb38df 100644 --- a/pkg/common/data_converter.go +++ b/pkg/common/data_converter.go @@ -17,8 +17,16 @@ limitations under the License. package common import ( + "encoding/json" "errors" + "fmt" + "reflect" "strconv" + "strings" + + "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/wrapperspb" ) // Convert string to other types @@ -38,3 +46,125 @@ func Convert(valueType string, value string) (result interface{}, err error) { return nil, errors.New("Convert failed") } } + +// ConvertToString other types to string +func ConvertToString(value interface{}) (string, error) { + var result string + if value == nil { + return result, nil + } + switch value.(type) { + case float64: + ft := value.(float64) + result = strconv.FormatFloat(ft, 'f', -1, 64) + case float32: + ft := value.(float32) + result = strconv.FormatFloat(float64(ft), 'f', -1, 64) + case int: + it := value.(int) + result = strconv.Itoa(it) + case uint: + it := value.(uint) + result = strconv.Itoa(int(it)) + case int8: + it := value.(int8) + result = strconv.Itoa(int(it)) + case uint8: + it := value.(uint8) + result = strconv.Itoa(int(it)) + case int16: + it := value.(int16) + result = strconv.Itoa(int(it)) + case uint16: + it := value.(uint16) + result = strconv.Itoa(int(it)) + case int32: + it := value.(int32) + result = strconv.Itoa(int(it)) + case uint32: + it := value.(uint32) + result = strconv.Itoa(int(it)) + case int64: + it := value.(int64) + result = strconv.FormatInt(it, 10) + case uint64: + it := value.(uint64) + result = strconv.FormatUint(it, 10) + case string: + result = value.(string) + case []byte: + result = string(value.([]byte)) + default: + newValue, err := json.Marshal(value) + if err != nil { + return "", err + } + result = string(newValue) + } + return result, nil +} + +// DecodeAnyValue Any to interface +func DecodeAnyValue(value *anypb.Any) (interface{}, error) { + typeURL := value.GetTypeUrl() + + messageTypeName := getMessageTypeName(typeURL) + if messageTypeName == "" { + return nil, fmt.Errorf("cant get message type:%s", typeURL) + } + if strings.Contains(messageTypeName, "google.protobuf.") { + switch messageTypeName { + case "google.protobuf.Int32Value": + return decodeWrapperValue(value, &wrapperspb.Int32Value{}) + case "google.protobuf.StringValue": + return decodeWrapperValue(value, &wrapperspb.StringValue{}) + case "google.protobuf.FloatValue": + return decodeWrapperValue(value, &wrapperspb.FloatValue{}) + case "google.protobuf.BoolValue": + return decodeWrapperValue(value, &wrapperspb.BoolValue{}) + case "google.protobuf.Int64Value": + return decodeWrapperValue(value, &wrapperspb.Int64Value{}) + default: + return nil, fmt.Errorf("unknown type : %s", messageTypeName) + } + + } + messageType := proto.MessageType(messageTypeName) + if messageType == nil { + return nil, fmt.Errorf("cant get message type:%s", messageTypeName) + } + + if !reflect.TypeOf((*proto.Message)(nil)).Elem().AssignableTo(messageType) { + return nil, fmt.Errorf("assiganbleto proto.Message error:%s", messageTypeName) + } + message := reflect.New(messageType.Elem()).Interface().(proto.Message) + if err := proto.Unmarshal(value.Value, message); err != nil { + return nil, fmt.Errorf("unmarshal value error:%v", err) + } + return message, nil +} + +// decodeWrapperValue get proto.Message, then convert to interface +func decodeWrapperValue(value *anypb.Any, wrapper proto.Message) (interface{}, error) { + if err := proto.Unmarshal(value.Value, wrapper); err != nil { + return nil, fmt.Errorf("decode wrapperValue,proto unmarshal error:%v", err) + } + wrapperValue := reflect.ValueOf(wrapper).Elem() + valueField := wrapperValue.FieldByName("Value") + if !valueField.IsValid() { + return nil, fmt.Errorf("cant get wrapperValue") + } + return valueField.Interface(), nil +} + +// getMessageTypeName get type by parse type url +func getMessageTypeName(typeURL string) string { + index := len(typeURL) - 1 + for index >= 0 && typeURL[index] != '/' { + index-- + } + if index >= 0 && index < len(typeURL)-1 { + return typeURL[index+1:] + } + return "" +} diff --git a/pkg/grpcserver/device.go b/pkg/grpcserver/device.go index e692f1a0..4c527c22 100644 --- a/pkg/grpcserver/device.go +++ b/pkg/grpcserver/device.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + "reflect" "time" + "k8s.io/klog/v2" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" "github.com/kubeedge/mappers-go/pkg/common" "github.com/kubeedge/mappers-go/pkg/driver/modbus" "github.com/kubeedge/mappers-go/pkg/util/parse" - - "k8s.io/klog/v2" ) func (s *Server) RegisterDevice(ctx context.Context, request *dmiapi.RegisterDeviceRequest) (*dmiapi.RegisterDeviceResponse, error) { @@ -21,7 +22,8 @@ func (s *Server) RegisterDevice(ctx context.Context, request *dmiapi.RegisterDev return nil, errors.New("device is nil") } if _, err := s.devPanel.GetDevice(device.Name); err == nil { - return nil, fmt.Errorf("add device %s failed, has existed", device.Name) + // The device has been registered + return &dmiapi.RegisterDeviceResponse{DeviceName: device.Name}, nil } var model common.DeviceModel @@ -167,17 +169,16 @@ func (s *Server) GetDevice(ctx context.Context, request *dmiapi.GetDeviceRequest Status: &dmiapi.DeviceStatus{}, }, } - switch s.cfg.Protocol { - case common.ProtocolModbus: - d := device.(*modbus.ModbusDev) - twins, err := parse.ConvTwinsToGrpc(d.Instance.Twins) - if err != nil { - return nil, err - } - res.Device.Status.Twins = twins - res.Device.Status.State = common.DEVSTOK - default: - return nil, fmt.Errorf("current mapper only support protocol %s's device", s.cfg.Protocol) + deviceValue := reflect.ValueOf(device) + twinsValue := deviceValue.FieldByName("Instance").FieldByName("Twins") + if !twinsValue.IsValid() { + return nil, fmt.Errorf("twins field not found") + } + twins, err := parse.ConvTwinsToGrpc(twinsValue.Interface().([]common.Twin)) + if err != nil { + return nil, err } + res.Device.Status.Twins = twins + res.Device.Status.State = common.DEVSTOK return res, nil } diff --git a/pkg/grpcserver/server.go b/pkg/grpcserver/server.go index fbb33f8f..e979e290 100644 --- a/pkg/grpcserver/server.go +++ b/pkg/grpcserver/server.go @@ -10,8 +10,6 @@ import ( "k8s.io/klog/v2" dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1" - modbusdevice "github.com/kubeedge/mappers-go/mappers/modbus-dmi/device" - "github.com/kubeedge/mappers-go/pkg/common" "github.com/kubeedge/mappers-go/pkg/global" ) @@ -21,26 +19,14 @@ type Config struct { } type Server struct { + lis net.Listener cfg Config devPanel global.DevPanel } -func NewServer(cfg Config) Server { +func NewServer(cfg Config, devPanel global.DevPanel) Server { s := Server{cfg: cfg} - switch cfg.Protocol { - case common.ProtocolModbus: - s.devPanel = modbusdevice.NewDevPanel() - case common.ProtocolBlueTooth: - // TODO - case common.ProtocolOpcua: - // TODO - case common.ProtocolOnvif: - // TODO - case common.ProtocolCustomized: - // TODO - default: - klog.Fatalf("unknown device protocol %s for grpc server", cfg.Protocol) - } + s.devPanel = devPanel return s } @@ -52,7 +38,7 @@ func (s *Server) Start() error { return err } - lis, err := net.Listen("unix", s.cfg.SockPath) + s.lis, err = net.Listen("unix", s.cfg.SockPath) if err != nil { klog.Fatalf("failed to remove uds socket with err: %v", err) return err @@ -60,8 +46,19 @@ func (s *Server) Start() error { grpcServer := grpc.NewServer() dmiapi.RegisterDeviceMapperServiceServer(grpcServer, s) reflection.Register(grpcServer) + klog.Info("start grpc server") + return grpcServer.Serve(s.lis) +} - return grpcServer.Serve(lis) +func (s *Server) Stop() { + err := s.lis.Close() + if err != nil { + return + } + err = os.Remove(s.cfg.SockPath) + if err != nil { + return + } } func initSock(sockPath string) error { diff --git a/pkg/util/parse/grpc.go b/pkg/util/parse/grpc.go index 5a2320e6..707c9b20 100644 --- a/pkg/util/parse/grpc.go +++ b/pkg/util/parse/grpc.go @@ -5,6 +5,7 @@ import ( "errors" "strconv" + "github.com/fatih/structs" "k8s.io/klog/v2" "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/constants" @@ -33,10 +34,42 @@ func BuildProtocolFromGrpc(device *dmiapi.Device) (common.Protocol, error) { if err != nil { return common.Protocol{}, err } - protocolCommonConfig, err := json.Marshal(device.Spec.Protocol.Common) - if err != nil { - return common.Protocol{}, err + var protocolCommonConfig []byte + + if device.Spec.Protocol.Common.CustomizedValues != nil { + commonConfig := make(map[string]interface{}) + recvAdapter := make(map[string]interface{}) + for k, v := range device.Spec.Protocol.Common.CustomizedValues.Data { + value, err := common.DecodeAnyValue(v) + if err != nil { + continue + } + recvAdapter[k] = value + } + if device.Spec.Protocol.Common.Com != nil { + commonConfig["com"] = structs.Map(device.Spec.Protocol.Common.Com) + } + if device.Spec.Protocol.Common.Tcp != nil { + commonConfig["tcp"] = structs.Map(device.Spec.Protocol.Common.Tcp) + } + commonConfig["commType"] = device.Spec.Protocol.Common.CommType + commonConfig["reconnTimeout"] = device.Spec.Protocol.Common.ReconnTimeout + commonConfig["reconnRetryTimes"] = device.Spec.Protocol.Common.ReconnRetryTimes + commonConfig["collectTimeout"] = device.Spec.Protocol.Common.CollectTimeout + commonConfig["collectRetryTimes"] = device.Spec.Protocol.Common.CollectRetryTimes + commonConfig["collectType"] = device.Spec.Protocol.Common.CollectType + commonConfig["customizedValues"] = recvAdapter + protocolCommonConfig, err = json.Marshal(commonConfig) + if err != nil { + return common.Protocol{}, err + } + } else { + protocolCommonConfig, err = json.Marshal(device.Spec.Protocol.Common) + if err != nil { + return common.Protocol{}, err + } } + var protocolConfig []byte switch protocolName { case constants.Modbus: @@ -55,7 +88,20 @@ func BuildProtocolFromGrpc(device *dmiapi.Device) (common.Protocol, error) { return common.Protocol{}, err } case constants.CustomizedProtocol: - protocolConfig, err = json.Marshal(device.Spec.Protocol.CustomizedProtocol) + customizedProtocol := make(map[string]interface{}) + customizedProtocol["protocolName"] = device.Spec.Protocol.CustomizedProtocol.ProtocolName + if device.Spec.Protocol.CustomizedProtocol.ConfigData != nil { + recvAdapter := make(map[string]interface{}) + for k, v := range device.Spec.Protocol.CustomizedProtocol.ConfigData.Data { + value, err := common.DecodeAnyValue(v) + if err != nil { + continue + } + recvAdapter[k] = value + } + customizedProtocol["configData"] = recvAdapter + } + protocolConfig, err = json.Marshal(customizedProtocol) if err != nil { return common.Protocol{}, err } @@ -106,7 +152,20 @@ func buildTwinsFromGrpc(device *dmiapi.Device) []common.Twin { return nil } case constants.CustomizedProtocol: - visitorConfig, err = json.Marshal(visitor.CustomizedProtocol) + customizedProtocol := make(map[string]interface{}) + customizedProtocol["protocolName"] = visitor.CustomizedProtocol.ProtocolName + if visitor.CustomizedProtocol.ConfigData != nil { + recvAdapter := make(map[string]interface{}) + for k, v := range visitor.CustomizedProtocol.ConfigData.Data { + value, err := common.DecodeAnyValue(v) + if err != nil { + continue + } + recvAdapter[k] = value + } + customizedProtocol["configData"] = recvAdapter + } + visitorConfig, err = json.Marshal(customizedProtocol) if err != nil { return nil } @@ -211,7 +270,18 @@ func buildPropertyVisitorsFromGrpc(device *dmiapi.Device) []common.PropertyVisit return nil } case constants.CustomizedProtocol: - visitorConfig, err = json.Marshal(pptv.CustomizedProtocol) + recvAdapter := make(map[string]interface{}) + for k, v := range pptv.CustomizedProtocol.ConfigData.Data { + value, err := common.DecodeAnyValue(v) + if err != nil { + continue + } + recvAdapter[k] = value + } + customizedProtocol := make(map[string]interface{}) + customizedProtocol["protocolName"] = pptv.CustomizedProtocol.ProtocolName + customizedProtocol["configData"] = recvAdapter + visitorConfig, err = json.Marshal(customizedProtocol) if err != nil { klog.Errorf("err: %+v", err) return nil diff --git a/pkg/util/parse/parse.go b/pkg/util/parse/parse.go index 467a8db4..ebd25409 100644 --- a/pkg/util/parse/parse.go +++ b/pkg/util/parse/parse.go @@ -36,97 +36,97 @@ func Parse(path string, dms map[string]common.DeviceModel, protocols map[string]common.Protocol) error { var deviceProfile common.DeviceProfile - jsonFile, err := ioutil.ReadFile(path) if err != nil { + err = errors.New("failed to read " + path + " file") return err } - + //Parse the JSON file and convert it into the data structure of DeviceProfile if err = json.Unmarshal(jsonFile, &deviceProfile); err != nil { return err } - - for i := 0; i < len(deviceProfile.DeviceInstances); i++ { - instance := deviceProfile.DeviceInstances[i] - j := 0 - for j = 0; j < len(deviceProfile.Protocols); j++ { - if instance.ProtocolName == deviceProfile.Protocols[j].Name { - instance.PProtocol = deviceProfile.Protocols[j] - break + // loop instIndex : judge whether the configmap definition is correct, and initialize the device instance + for instIndex := 0; instIndex < len(deviceProfile.DeviceInstances); instIndex++ { + instance := deviceProfile.DeviceInstances[instIndex] + // loop protoIndex : judge whether the device's protocol is correct, and initialize the device protocol + protoIndex := 0 + for protoIndex = 0; protoIndex < len(deviceProfile.Protocols); protoIndex++ { + if instance.ProtocolName == deviceProfile.Protocols[protoIndex].Name { + // Verify that the protocols match + protocolConfig := make(map[string]interface{}) + err := json.Unmarshal(deviceProfile.Protocols[protoIndex].ProtocolConfigs, &protocolConfig) + if err != nil { + err = errors.New("failed to parse " + deviceProfile.Protocols[protoIndex].Name) + return err + } + protocols[deviceProfile.Protocols[protoIndex].Name] = deviceProfile.Protocols[protoIndex] + instance.PProtocol = deviceProfile.Protocols[protoIndex] } } - if j == len(deviceProfile.Protocols) { - return errors.New("protocol not found") - } - - if instance.PProtocol.Protocol != "bluetooth" { - continue - } - - for k := 0; k < len(instance.PropertyVisitors); k++ { - modelName := instance.PropertyVisitors[k].ModelName - propertyName := instance.PropertyVisitors[k].PropertyName - l := 0 - for l = 0; l < len(deviceProfile.DeviceModels); l++ { - if modelName == deviceProfile.DeviceModels[l].Name { + // loop propertyIndex : find the device model's properties for each device instance's propertyVisitor + for propertyIndex := 0; propertyIndex < len(instance.PropertyVisitors); propertyIndex++ { + modelName := instance.PropertyVisitors[propertyIndex].ModelName + propertyName := instance.PropertyVisitors[propertyIndex].PropertyName + modelIndex := 0 + // loop modelIndex : find a matching device model, and initialize the device model + for modelIndex = 0; modelIndex < len(deviceProfile.DeviceModels); modelIndex++ { + if modelName == deviceProfile.DeviceModels[modelIndex].Name { + dms[deviceProfile.DeviceModels[modelIndex].Name] = deviceProfile.DeviceModels[modelIndex] m := 0 - for m = 0; m < len(deviceProfile.DeviceModels[l].Properties); m++ { - if propertyName == deviceProfile.DeviceModels[l].Properties[m].Name { - instance.PropertyVisitors[k].PProperty = deviceProfile.DeviceModels[l].Properties[m] + // loop m : find a matching device model's properties + for m = 0; m < len(deviceProfile.DeviceModels[modelIndex].Properties); m++ { + if propertyName == deviceProfile.DeviceModels[modelIndex].Properties[m].Name { + instance.PropertyVisitors[propertyIndex].PProperty = deviceProfile.DeviceModels[modelIndex].Properties[m] break } } - - if m == len(deviceProfile.DeviceModels[l].Properties) { - return errors.New("property not found") + if m == len(deviceProfile.DeviceModels[modelIndex].Properties) { + err = errors.New("property mismatch") + return err } break } } - if l == len(deviceProfile.DeviceModels) { - return errors.New("device model not found") + if modelIndex == len(deviceProfile.DeviceModels) { + err = errors.New("device model mismatch") + return err } } - - for k := 0; k < len(instance.Twins); k++ { - name := instance.Twins[k].PropertyName + // loop propertyIndex : find propertyVisitors for each instance's twin + for propertyIndex := 0; propertyIndex < len(instance.Twins); propertyIndex++ { + name := instance.Twins[propertyIndex].PropertyName l := 0 + // loop l : find a matching propertyName for l = 0; l < len(instance.PropertyVisitors); l++ { if name == instance.PropertyVisitors[l].PropertyName { - instance.Twins[k].PVisitor = &instance.PropertyVisitors[l] + instance.Twins[propertyIndex].PVisitor = &instance.PropertyVisitors[l] break } } if l == len(instance.PropertyVisitors) { - return errors.New("propertyVisitor not found") + err = errors.New("propertyVisitor mismatch") + return err } } - - for k := 0; k < len(instance.Datas.Properties); k++ { - name := instance.Datas.Properties[k].PropertyName + // loop propertyIndex : find propertyVisitors for each instance's property + for propertyIndex := 0; propertyIndex < len(instance.Datas.Properties); propertyIndex++ { + name := instance.Datas.Properties[propertyIndex].PropertyName l := 0 + // loop l : find a matching propertyName for l = 0; l < len(instance.PropertyVisitors); l++ { if name == instance.PropertyVisitors[l].PropertyName { - instance.Datas.Properties[k].PVisitor = &instance.PropertyVisitors[l] + instance.Datas.Properties[propertyIndex].PVisitor = &instance.PropertyVisitors[l] break } } if l == len(instance.PropertyVisitors) { - return errors.New("propertyVisitor not found") + err = errors.New("propertyVisitor mismatch") + return err } } - devices[instance.ID] = new(common.DeviceInstance) devices[instance.ID] = &instance - klog.V(4).Info("Instance: ", instance.ID, instance) - } - - for i := 0; i < len(deviceProfile.DeviceModels); i++ { - dms[deviceProfile.DeviceModels[i].Name] = deviceProfile.DeviceModels[i] - } - - for i := 0; i < len(deviceProfile.Protocols); i++ { - protocols[deviceProfile.Protocols[i].Name] = deviceProfile.Protocols[i] + klog.V(4).Infof("Instance:%s Successfully registered", instance.ID) } return nil } diff --git a/pkg/util/parse/type.go b/pkg/util/parse/type.go index d7f0b24b..0647f388 100644 --- a/pkg/util/parse/type.go +++ b/pkg/util/parse/type.go @@ -336,12 +336,6 @@ func ConvMsgTwinToGrpc(msgTwin map[string]*common.MsgTwin) []*dmiapi.Twin { for name, twin := range msgTwin { twinData := &dmiapi.Twin{ PropertyName: name, - Desired: &dmiapi.TwinProperty{ - Value: *twin.Expected.Value, - Metadata: map[string]string{ - "type": twin.Metadata.Type, - "timestamp": twin.Expected.Metadata.Timestamp, - }}, Reported: &dmiapi.TwinProperty{ Value: *twin.Actual.Value, Metadata: map[string]string{