diff --git a/client/client.go b/client/client.go index 521f85e..1dfdd04 100644 --- a/client/client.go +++ b/client/client.go @@ -1,82 +1,81 @@ package main import ( - "net" - "fmt" - "comm" - "time" - "io" - "miop" - "config" - //"strconv" + "comm" + "config" + "fmt" + "io" + "miop" + "net" + "time" + //"strconv" ) func main() { - fmt.Printf("pls enter uid@product:\n") - var uid string - fmt.Scan(&uid) - var lid string - fmt.Printf("pls end last msgid:\n") - fmt.Scan(&lid) + fmt.Printf("pls enter uid@product:\n") + var uid string + fmt.Scan(&uid) + var lid string + fmt.Printf("pls end last msgid:\n") + fmt.Scan(&lid) - room := getRoom() - if "" == room { - fmt.Printf("no active room\n") - return - } - conn, err := net.Dial("tcp", room) - comm.CheckError(err, "connect error") - - var buf [1024]byte - - msg := miop.MiopDataPack(uint32(123), "i am message") - b := miop.MiopPack("token:7a99ec51ee732b0536e1f93e0bde7737\nu:"+uid+"\nlid:"+lid, msg) - fmt.Printf("send len: %d\n", len(b)) - _, errr := conn.Write(b) - comm.CheckError(errr, "send error") - defer conn.Close() - i := 1 - for { - rlen, err := conn.Read(buf[0:]) - if nil != err { - if err == io.EOF { - println("receive end") - break - } else { - println("server close\n") - } - } - datas, _ := miop.MiopUnpack(buf[:rlen]) - for _, data := range datas { - msgs := data.ParseData() - for _, rmsg := range msgs { - fmt.Printf("receieve: id:%d msg:%s\n", rmsg.Id, rmsg.Msg) - } - } - //fmt.Printf("receieve: %s \n", buf) - //i++ - //says := uid + "say: " + strconv.Itoa(i) - //conn.Write([]byte(says)) - time.Sleep(time.Duration(i) * time.Second) - } + room := getRoom() + if "" == room { + fmt.Printf("no active room\n") + return + } + conn, err := net.Dial("tcp", room) + comm.CheckError(err, "connect error") -} + var buf [1024]byte + + msg := miop.MiopDataPack(uint32(123), "i am message") + b := miop.MiopPack("token:7a99ec51ee732b0536e1f93e0bde7737\nu:"+uid+"\nlid:"+lid, msg) + fmt.Printf("send len: %d\n", len(b)) + _, errr := conn.Write(b) + comm.CheckError(errr, "send error") + defer conn.Close() + i := 1 + for { + rlen, err := conn.Read(buf[0:]) + if nil != err { + if err == io.EOF { + println("receive end") + break + } else { + println("server close\n") + } + } + datas, _ := miop.MiopUnpack(buf[:rlen]) + for _, data := range datas { + msgs := data.ParseData() + for _, rmsg := range msgs { + fmt.Printf("receieve: id:%d msg:%s\n", rmsg.Id, rmsg.Msg) + } + } + //fmt.Printf("receieve: %s \n", buf) + //i++ + //says := uid + "say: " + strconv.Itoa(i) + //conn.Write([]byte(says)) + time.Sleep(time.Duration(i) * time.Second) + } +} func getRoom() string { - conn, err := net.Dial("tcp", config.DispatcherHost) - comm.CheckError(err, "connect err") - defer conn.Close() - - var buf [100]byte - rlen, err := conn.Read(buf[0:]) - - if nil != err { - if err != io.EOF { - comm.CheckError(err, "read error") - } - } + conn, err := net.Dial("tcp", config.DispatcherHost) + comm.CheckError(err, "connect err") + defer conn.Close() + + var buf [100]byte + rlen, err := conn.Read(buf[0:]) + + if nil != err { + if err != io.EOF { + comm.CheckError(err, "read error") + } + } + + return string(buf[0:rlen]) - return string(buf[0:rlen]) - } diff --git a/comm/comm.go b/comm/comm.go index 46268ed..ebbdb9f 100644 --- a/comm/comm.go +++ b/comm/comm.go @@ -1,91 +1,91 @@ package comm import ( - "crypto/md5" - "encoding/hex" - "github.com/hoisie/redis" - "strconv" - "config" - "fmt" - "bytes" + "bytes" + "config" + "crypto/md5" + "encoding/hex" + "fmt" + "github.com/hoisie/redis" + "strconv" ) func CheckError(err error, msg string) { - if nil != err { - str := "ERROR: " + msg + " " + err.Error() - GetLogger(ERROR).Println(msg) - panic(str) - } + if nil != err { + str := "ERROR: " + msg + " " + err.Error() + GetLogger(ERROR).Println(msg) + panic(str) + } } func LogError(err error, msg string) { - if nil != err { - GetLogger(ERROR).Println(msg) - } - + if nil != err { + GetLogger(ERROR).Println(msg) + } + } var client redis.Client func GetMsg(key string) []byte { - return GetData(key) + return GetData(key) } -func GetMsgList(id int) ([][]byte){ - id++ - msgs, err := client.Zrangebyscore(config.BoardCastKey, float64(id), 10000) - LogError(err, "getmsglist error") - return msgs +func GetMsgList(id int) [][]byte { + id++ + msgs, err := client.Zrangebyscore(config.BoardCastKey, float64(id), 10000) + LogError(err, "getmsglist error") + return msgs } func AddMsg(msg string) (bool, int) { - id := setMaxId() - fmt.Printf("get max id: %d\n", id) - mid := strconv.Itoa(id) - msg = mid + "@$@" + msg - ok, err := client.Zadd(config.BoardCastKey, []byte(msg), float64(id)) - CheckError(err, "add msg error") - return ok, id + id := setMaxId() + fmt.Printf("get max id: %d\n", id) + mid := strconv.Itoa(id) + msg = mid + "@$@" + msg + ok, err := client.Zadd(config.BoardCastKey, []byte(msg), float64(id)) + CheckError(err, "add msg error") + return ok, id } -func setMaxId() int{ - id := GetMsg("MSG_MAX_ID") - mid := 0 - if nil != id { - mid, _ = strconv.Atoi(string(id)) - } - mid++ - err := client.Set("MSG_MAX_ID", []byte(strconv.Itoa(mid))) - CheckError(err, "set max id error") - return mid +func setMaxId() int { + id := GetMsg("MSG_MAX_ID") + mid := 0 + if nil != id { + mid, _ = strconv.Atoi(string(id)) + } + mid++ + err := client.Set("MSG_MAX_ID", []byte(strconv.Itoa(mid))) + CheckError(err, "set max id error") + return mid } func SetSendMaxId() { - msgs, err := client.Zrevrange(config.BoardCastKey, 0, 0) - LogError(err, "set send max id error") - id := []byte("0") - if len(msgs) > 0 { - arrs := bytes.Split(msgs[0], []byte("@$@")) - if len(arrs) > 1 { - id = arrs[0] - } - } - - client.Set("SEND_MSG_MAX_ID", id) + msgs, err := client.Zrevrange(config.BoardCastKey, 0, 0) + LogError(err, "set send max id error") + id := []byte("0") + if len(msgs) > 0 { + arrs := bytes.Split(msgs[0], []byte("@$@")) + if len(arrs) > 1 { + id = arrs[0] + } + } + + client.Set("SEND_MSG_MAX_ID", id) } func GetSendMaxId() int { - id := GetMsg("SEND_MSG_MAX_ID") - mid := 0 - if nil != id { - mid, _ = strconv.Atoi(string(id)) - } - return mid + id := GetMsg("SEND_MSG_MAX_ID") + mid := 0 + if nil != id { + mid, _ = strconv.Atoi(string(id)) + } + return mid } func GetData(key string) []byte { - val, _ := client.Get(key) - return val + val, _ := client.Get(key) + return val } func SetData(msg string) { @@ -93,24 +93,23 @@ func SetData(msg string) { } func SaveData(key string, val []byte) bool { - err := client.Set(key, val) - if err != nil { - return false - } - return true + err := client.Set(key, val) + if err != nil { + return false + } + return true } func GetMd5(key string) string { - h := md5.New() - h.Write([]byte(key)) - return hex.EncodeToString(h.Sum(nil)) + h := md5.New() + h.Write([]byte(key)) + return hex.EncodeToString(h.Sum(nil)) } - func MergeByte(b1, b2 []byte) []byte { - tmp := make([][]byte, 0) - tmp[0] = b1 - tmp[1] = b2 - - return bytes.Join(tmp, []byte{}) + tmp := make([][]byte, 0) + tmp[0] = b1 + tmp[1] = b2 + + return bytes.Join(tmp, []byte{}) } diff --git a/comm/logger.go b/comm/logger.go index 184e616..63a3283 100644 --- a/comm/logger.go +++ b/comm/logger.go @@ -1,60 +1,59 @@ package comm import ( - "log" - "os" - "config" - "fmt" - "time" - "strconv" + "config" + "fmt" + "log" + "os" + "strconv" + "time" ) const ( - DEBUG = "debug" - LOG = "log" - ERROR = "error" + DEBUG = "debug" + LOG = "log" + ERROR = "error" ) -var _loggers = make(map[string]*log.Logger , 0) +var _loggers = make(map[string]*log.Logger, 0) func getDate(sep string) string { - year, month, day := time.Now().Date() + year, month, day := time.Now().Date() - return strconv.Itoa(year) + sep + strconv.Itoa(int(month)) + sep + strconv.Itoa(day) + return strconv.Itoa(year) + sep + strconv.Itoa(int(month)) + sep + strconv.Itoa(day) } func checkDir(dir string, mode os.FileMode) { - err := os.Mkdir(dir, mode) - if nil != err { - if os.IsNotExist(err) { - CheckError(err, "mkdir " + dir + " error") - } - } + err := os.Mkdir(dir, mode) + if nil != err { + if os.IsNotExist(err) { + CheckError(err, "mkdir "+dir+" error") + } + } } func GetLogger(filename string) *log.Logger { - logger, ok := _loggers[filename] - if !ok { - dir := config.LogPath + getDate("_") - checkDir(dir, 0777) - logfile, err := os.OpenFile(dir + "/" + filename + ".log", os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0) - if err != nil { - fmt.Printf("%s, %s\r\n", filename, err.Error()) - os.Exit(-1) - } - logger = log.New(logfile, "", log.Ldate|log.Ltime|log.Llongfile) - _loggers[filename] = logger - } - - return logger + logger, ok := _loggers[filename] + if !ok { + dir := config.LogPath + getDate("_") + checkDir(dir, 0777) + logfile, err := os.OpenFile(dir+"/"+filename+".log", os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0) + if err != nil { + fmt.Printf("%s, %s\r\n", filename, err.Error()) + os.Exit(-1) + } + logger = log.New(logfile, "", log.Ldate|log.Ltime|log.Llongfile) + _loggers[filename] = logger + } + + return logger } - func Debug(format string, v ...interface{}) { - GetLogger(DEBUG).Printf(format, v...) + GetLogger(DEBUG).Printf(format, v...) } func Log(format string, v ...interface{}) { - GetLogger(LOG).Printf(format, v...) + GetLogger(LOG).Printf(format, v...) } diff --git a/config/config.go b/config/config.go index 2776b20..88462c7 100644 --- a/config/config.go +++ b/config/config.go @@ -1,31 +1,30 @@ package config import ( - "flag" + "flag" ) var ( - Version = 0 - HostAndPort = ":9090" - BuffLen = 1024 - HeartbeatTime int64 = 10 - LogPath = "/data/go/log/" - Key = "adfop013@dafa" - Deamonize = false - Room = "127.0.0.1:9090" - BoardCastKey = "boardcast_movie" + Version = 0 + HostAndPort = ":9090" + BuffLen = 1024 + HeartbeatTime int64 = 10 + LogPath = "/data/go/log/" + Key = "adfop013@dafa" + Deamonize = false + Room = "127.0.0.1:9090" + BoardCastKey = "boardcast_movie" - RoomList = make(map[string]int) + RoomList = make(map[string]int) ) var ( - RLFile = "/data/go/src/conf/dispatcher.conf" - AdminToken = "piuer813#@80" - AdminUid = "admin" - DispatcherHost = "127.0.0.1:9099" + RLFile = "/data/go/src/conf/dispatcher.conf" + AdminToken = "piuer813#@80" + AdminUid = "admin" + DispatcherHost = "127.0.0.1:9099" ) - func Init() { flag.StringVar(&HostAndPort, "hp", "0.0.0.0:9090", "host and port") flag.StringVar(&LogPath, "log", "/data/go/log/", "lop path") diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index cf5284a..584219b 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -1,119 +1,119 @@ package dispatcher import ( - "net" -// "fmt" - "comm" - "time" - "io" - "miop" - "math/rand" - "encoding/json" - "io/ioutil" - "config" - "strconv" + "net" + // "fmt" + "comm" + "config" + "encoding/json" + "io" + "io/ioutil" + "math/rand" + "miop" + "strconv" + "time" ) type RoomInfo struct { - Rid string `json:"rid"` - Ol int `json:"ol"` + Rid string `json:"rid"` + Ol int `json:"ol"` } type Dispatcher struct { - RoomList []RoomInfo `json:"rooms"` + RoomList []RoomInfo `json:"rooms"` } var Dis Dispatcher -func LoadRoom() { - roomBuf, err := ioutil.ReadFile(config.RLFile) - comm.CheckError(err, "read file "+ config.RLFile + " error") - //fmt.Printf("config: %s\n", roomBuf) - - err = json.Unmarshal(roomBuf, &Dis) - comm.CheckError(err, "json decode err") - //fmt.Printf("decode: %s\n", Dis) +func LoadRoom() { + roomBuf, err := ioutil.ReadFile(config.RLFile) + comm.CheckError(err, "read file "+config.RLFile+" error") + //fmt.Printf("config: %s\n", roomBuf) + + err = json.Unmarshal(roomBuf, &Dis) + comm.CheckError(err, "json decode err") + //fmt.Printf("decode: %s\n", Dis) } func (dis *Dispatcher) GetAllStats() { - for index, roomInfo := range dis.RoomList { - ol, err := GetRoomStats(roomInfo.Rid) - if nil != err { - comm.Log("get romm:%s err:%s", roomInfo.Rid, err.Error()) - } - //fmt.Printf("index: %d, rid: %s, ol: %d", index, roomInfo.Rid, ol) - dis.RoomList[index].Ol = ol - } + for index, roomInfo := range dis.RoomList { + ol, err := GetRoomStats(roomInfo.Rid) + if nil != err { + comm.Log("get romm:%s err:%s", roomInfo.Rid, err.Error()) + } + //fmt.Printf("index: %d, rid: %s, ol: %d", index, roomInfo.Rid, ol) + dis.RoomList[index].Ol = ol + } } func (dis *Dispatcher) GetRoom() string { - sum := 0 - - for _, roomInfo := range dis.RoomList { - //fmt.Printf("rid: %s ol: %d", roomInfo.Rid, roomInfo.Ol) - if roomInfo.Ol < 0 { - continue; - } - sum += roomInfo.Ol - } - - if sum < 1 { - return "" - } - r := rand.New(rand.NewSource(time.Now().UnixNano())) - rand := r.Intn(sum) - s := 0 - //fmt.Printf("rand:%d, sum:%d\n", rand, sum) - - for _, roomInfo := range dis.RoomList { - if roomInfo.Ol < 0 { - continue; - } - s+=roomInfo.Ol - if rand < s { - //fmt.Printf("success s: %d, rand: %d\n", s, rand) - return roomInfo.Rid - } - } - - return "" + sum := 0 + + for _, roomInfo := range dis.RoomList { + //fmt.Printf("rid: %s ol: %d", roomInfo.Rid, roomInfo.Ol) + if roomInfo.Ol < 0 { + continue + } + sum += roomInfo.Ol + } + + if sum < 1 { + return "" + } + r := rand.New(rand.NewSource(time.Now().UnixNano())) + rand := r.Intn(sum) + s := 0 + //fmt.Printf("rand:%d, sum:%d\n", rand, sum) + + for _, roomInfo := range dis.RoomList { + if roomInfo.Ol < 0 { + continue + } + s += roomInfo.Ol + if rand < s { + //fmt.Printf("success s: %d, rand: %d\n", s, rand) + return roomInfo.Rid + } + } + + return "" } func GetRoomStats(rid string) (int, error) { - conn, err := net.Dial("tcp", rid) - if nil != err { - return -1, err - } - defer conn.Close() - - b := miop.MiopPack("token:"+config.AdminToken+"\nu:"+config.AdminUid, []byte("")) - //fmt.Printf("send len: %d\n", len(b)) - _, errr := conn.Write(b) - if nil != errr { - return -1, err - } - buf := make([]byte, config.BuffLen) - for { - rlen, err := conn.Read(buf[0:]) - if nil != err { - if err == io.EOF { - println("receive end") - break - } else { - println("server close\n") - } - } - datas, _ := miop.MiopUnpack(buf[:rlen]) - for _, data := range datas { - prop := data.ParseProp() - ol := prop.GetPropKey("ol") - if "" == ol { - continue - } - return strconv.Atoi(ol) - } - } - - return -1, nil + conn, err := net.Dial("tcp", rid) + if nil != err { + return -1, err + } + defer conn.Close() + + b := miop.MiopPack("token:"+config.AdminToken+"\nu:"+config.AdminUid, []byte("")) + //fmt.Printf("send len: %d\n", len(b)) + _, errr := conn.Write(b) + if nil != errr { + return -1, err + } + buf := make([]byte, config.BuffLen) + for { + rlen, err := conn.Read(buf[0:]) + if nil != err { + if err == io.EOF { + println("receive end") + break + } else { + println("server close\n") + } + } + datas, _ := miop.MiopUnpack(buf[:rlen]) + for _, data := range datas { + prop := data.ParseProp() + ol := prop.GetPropKey("ol") + if "" == ol { + continue + } + return strconv.Atoi(ol) + } + } + + return -1, nil } diff --git a/dispatcher_server/server.go b/dispatcher_server/server.go index dba5299..bcc19e0 100644 --- a/dispatcher_server/server.go +++ b/dispatcher_server/server.go @@ -1,37 +1,35 @@ package main import ( - "server" - "dispatcher" - "net" - "config" - "time" - "comm" - "fmt" + "comm" + "config" + "dispatcher" + "fmt" + "net" + "server" + "time" ) func main() { - dispatcher.LoadRoom() - fmt.Printf("%s", dispatcher.Dis) - go func() { - for { - dispatcher.Dis.GetAllStats() - time.Sleep(3 * time.Second) - } - }() - ln := server.Start(config.DispatcherHost) - for { - conn, err := ln.Accept() - comm.CheckError(err, "accept error") - go handlerConn(conn) - } + dispatcher.LoadRoom() + fmt.Printf("%s", dispatcher.Dis) + go func() { + for { + dispatcher.Dis.GetAllStats() + time.Sleep(3 * time.Second) + } + }() + ln := server.Start(config.DispatcherHost) + for { + conn, err := ln.Accept() + comm.CheckError(err, "accept error") + go handlerConn(conn) + } } - func handlerConn(conn net.Conn) { - defer conn.Close() - rid := dispatcher.Dis.GetRoom() - fmt.Printf("get rid: %s\n", rid) - conn.Write([]byte(rid)) + defer conn.Close() + rid := dispatcher.Dis.GetRoom() + fmt.Printf("get rid: %s\n", rid) + conn.Write([]byte(rid)) } - diff --git a/entity/user.go b/entity/user.go index 87d79a4..cf20dea 100644 --- a/entity/user.go +++ b/entity/user.go @@ -1,73 +1,73 @@ package entity import ( - "comm" - "encoding/json" - "config" + "comm" + "config" + "encoding/json" ) type UserEntity struct { - Uid string - Ip string - CityCode int - CreateTime int64 - LoginTime int64 - LastTime int64 - Room string + Uid string + Ip string + CityCode int + CreateTime int64 + LoginTime int64 + LastTime int64 + Room string } -func GetEntity() *UserEntity{ - return &UserEntity{"", "", 0, 0, 0, 0, config.Room} +func GetEntity() *UserEntity { + return &UserEntity{"", "", 0, 0, 0, 0, config.Room} } type UserFilter struct { - CityCode int - LastTime int64 + CityCode int + LastTime int64 } func (ue *UserEntity) Save() bool { - data, err := json.Marshal(ue) - comm.CheckError(err, "ue json encode error") - return comm.SaveData(ue.Uid, data) + data, err := json.Marshal(ue) + comm.CheckError(err, "ue json encode error") + return comm.SaveData(ue.Uid, data) } func GetUser(uid string) *UserEntity { - data := comm.GetData(uid) - if nil == data { - return nil - } + data := comm.GetData(uid) + if nil == data { + return nil + } - var user UserEntity - err := json.Unmarshal(data, &user) + var user UserEntity + err := json.Unmarshal(data, &user) - if nil != err { - return nil - } + if nil != err { + return nil + } - return &user + return &user } func AddUser(uid, ip string, cc int, ct, lt, lat int64) *UserEntity { - user := &UserEntity{uid, ip, cc, ct, lt, lat, config.Room} - if user.Save() { - return user - } + user := &UserEntity{uid, ip, cc, ct, lt, lat, config.Room} + if user.Save() { + return user + } - return nil + return nil } func (uf *UserFilter) CheckFilter(ue UserEntity) bool { - if uf.CityCode > 0 { - if uf.CityCode != ue.CityCode { - return false - } - } - if uf.LastTime > 0 { - if uf.LastTime < ue.LastTime { - return false - } - } + if uf.CityCode > 0 { + if uf.CityCode != ue.CityCode { + return false + } + } + if uf.LastTime > 0 { + if uf.LastTime < ue.LastTime { + return false + } + } - return true + return true } diff --git a/miop/miop.go b/miop/miop.go index 727ba34..ce0cae3 100644 --- a/miop/miop.go +++ b/miop/miop.go @@ -1,253 +1,253 @@ package miop import ( - "bytes" - "comm" - "config" - "encoding/binary" - "encoding/json" - "entity" - "strconv" - "strings" + "bytes" + "comm" + "config" + "encoding/binary" + "encoding/json" + "entity" + "strconv" + "strings" ) type MiopData struct { - Id uint32 - Bl uint32 - Msg string + Id uint32 + Bl uint32 + Msg string } type MiopProp struct { - Prop map[string]string + Prop map[string]string } type Miop struct { - Ve uint16 - Pl uint16 - Dl uint32 - Prop []byte - Data []byte + Ve uint16 + Pl uint16 + Dl uint32 + Prop []byte + Data []byte } func MiopDataPack(id uint32, content string) []byte { - buf := new(bytes.Buffer) - var data = []interface{}{ - id, - uint32(len(content)), - []byte(content), - } - for i, v := range data { - err := binary.Write(buf, binary.BigEndian, v) - if err != nil { - comm.CheckError(err, "miopdata binary.Write failed:"+string(i)) - } - } - - return buf.Bytes() + buf := new(bytes.Buffer) + var data = []interface{}{ + id, + uint32(len(content)), + []byte(content), + } + for i, v := range data { + err := binary.Write(buf, binary.BigEndian, v) + if err != nil { + comm.CheckError(err, "miopdata binary.Write failed:"+string(i)) + } + } + + return buf.Bytes() } func MiopPack(prop string, msg []byte) []byte { - buf := new(bytes.Buffer) - var data = []interface{}{ - uint16(0), - uint16(len(prop)), - uint32(len(msg)), - []byte(prop), - msg, - } - - for i, v := range data { - err := binary.Write(buf, binary.BigEndian, v) - if err != nil { - comm.CheckError(err, "binary.Write failed:"+string(i)) - } - } - - return buf.Bytes() + buf := new(bytes.Buffer) + var data = []interface{}{ + uint16(0), + uint16(len(prop)), + uint32(len(msg)), + []byte(prop), + msg, + } + + for i, v := range data { + err := binary.Write(buf, binary.BigEndian, v) + if err != nil { + comm.CheckError(err, "binary.Write failed:"+string(i)) + } + } + + return buf.Bytes() } func MiopUnpack(b []byte) ([]*Miop, []byte) { - blen := len(b) - mlen := blen - comm.Debug("blen: %d, mlen: %d\n", blen, mlen) - buff := bytes.NewReader(b) - miops := make([]*Miop, 0) - more := make([]byte, 0) - for { - if blen <= 0 { - break - } - var ve uint16 - err := binary.Read(buff, binary.BigEndian, &ve) - comm.CheckError(err, "read ve error") - var pl uint16 - err = binary.Read(buff, binary.BigEndian, &pl) - comm.CheckError(err, "read pl error") - var dl uint32 - err = binary.Read(buff, binary.BigEndian, &dl) - comm.CheckError(err, "read dl error") - msgLen := int(8 + uint32(pl) + dl) - comm.Debug("pl:%d, dl:%d, msgLen: %d, blen: %d\n", pl, dl, msgLen, blen) - if blen < msgLen { - break - } - prop := make([]byte, pl) - err = binary.Read(buff, binary.BigEndian, &prop) - comm.CheckError(err, "read prop error") - data := make([]byte, dl) - err = binary.Read(buff, binary.BigEndian, &data) - comm.CheckError(err, "read data error") - - miops = append(miops, &Miop{ve, pl, dl, prop, data}) - - blen -= msgLen - - } - - if blen > 0 { - ml := mlen - blen - more = b[ml:] - } - - return miops, more + blen := len(b) + mlen := blen + comm.Debug("blen: %d, mlen: %d\n", blen, mlen) + buff := bytes.NewReader(b) + miops := make([]*Miop, 0) + more := make([]byte, 0) + for { + if blen <= 0 { + break + } + var ve uint16 + err := binary.Read(buff, binary.BigEndian, &ve) + comm.CheckError(err, "read ve error") + var pl uint16 + err = binary.Read(buff, binary.BigEndian, &pl) + comm.CheckError(err, "read pl error") + var dl uint32 + err = binary.Read(buff, binary.BigEndian, &dl) + comm.CheckError(err, "read dl error") + msgLen := int(8 + uint32(pl) + dl) + comm.Debug("pl:%d, dl:%d, msgLen: %d, blen: %d\n", pl, dl, msgLen, blen) + if blen < msgLen { + break + } + prop := make([]byte, pl) + err = binary.Read(buff, binary.BigEndian, &prop) + comm.CheckError(err, "read prop error") + data := make([]byte, dl) + err = binary.Read(buff, binary.BigEndian, &data) + comm.CheckError(err, "read data error") + + miops = append(miops, &Miop{ve, pl, dl, prop, data}) + + blen -= msgLen + + } + + if blen > 0 { + ml := mlen - blen + more = b[ml:] + } + + return miops, more } func (miop *Miop) ParseProp() *MiopProp { - if miop.Pl < 1 { - return nil - } - arrs := bytes.Split(miop.Prop, []byte("\n")) - propMap := make(map[string]string) - for _, v := range arrs { - arr := bytes.Split(v, []byte(":")) - if len(arr) > 1 { - propMap[string(arr[0])] = string(arr[1]) - } - } - - return &MiopProp{propMap} + if miop.Pl < 1 { + return nil + } + arrs := bytes.Split(miop.Prop, []byte("\n")) + propMap := make(map[string]string) + for _, v := range arrs { + arr := bytes.Split(v, []byte(":")) + if len(arr) > 1 { + propMap[string(arr[0])] = string(arr[1]) + } + } + + return &MiopProp{propMap} } func (prop *MiopProp) GetPropKey(key string) string { - if nil == prop { - return "" - } - val, ok := prop.Prop[key] - if ok { - return val - } - return "" + if nil == prop { + return "" + } + val, ok := prop.Prop[key] + if ok { + return val + } + return "" } -func (prop *MiopProp) GetLastId() (int) { - val := prop.GetPropKey("lid") - if "" == val { - return -1 - } - - id, _ := strconv.Atoi(val) - return id +func (prop *MiopProp) GetLastId() int { + val := prop.GetPropKey("lid") + if "" == val { + return -1 + } + + id, _ := strconv.Atoi(val) + return id } func (prop *MiopProp) GetUser() (string, string) { - val := prop.GetPropKey("u") - if "" == val { - return "", "" - } + val := prop.GetPropKey("u") + if "" == val { + return "", "" + } - infos := strings.Split(val, "@") + infos := strings.Split(val, "@") - u := infos[0] + u := infos[0] - p := "" + p := "" - if len(infos) > 1 { - p = infos[1] - } - return u, p + if len(infos) > 1 { + p = infos[1] + } + return u, p } func (miop *Miop) ParseData() []*MiopData { - if miop.Dl < 1 { - return nil - } - buff := bytes.NewReader(miop.Data) - arr := make([]*MiopData, 0) - dlen := len(miop.Data) - for { - if dlen <= 0 { - break - } - var id uint32 - err := binary.Read(buff, binary.BigEndian, &id) - comm.CheckError(err, "read id error") - if id <= 0 { - break - } - - var dl uint32 - err = binary.Read(buff, binary.BigEndian, &dl) - comm.CheckError(err, "read dl error") - var content = make([]byte, dl) - err = binary.Read(buff, binary.BigEndian, &content) - comm.CheckError(err, "read content error") - - arr = append(arr, &MiopData{id, dl, string(content)}) - - dlen -= int(8 + dl) - } - - return arr + if miop.Dl < 1 { + return nil + } + buff := bytes.NewReader(miop.Data) + arr := make([]*MiopData, 0) + dlen := len(miop.Data) + for { + if dlen <= 0 { + break + } + var id uint32 + err := binary.Read(buff, binary.BigEndian, &id) + comm.CheckError(err, "read id error") + if id <= 0 { + break + } + + var dl uint32 + err = binary.Read(buff, binary.BigEndian, &dl) + comm.CheckError(err, "read dl error") + var content = make([]byte, dl) + err = binary.Read(buff, binary.BigEndian, &content) + comm.CheckError(err, "read content error") + + arr = append(arr, &MiopData{id, dl, string(content)}) + + dlen -= int(8 + dl) + } + + return arr } func (prop *MiopProp) CheckToken() bool { - token := prop.GetPropKey("token") - if "" == token { - comm.Log("token empty") - return false - } + token := prop.GetPropKey("token") + if "" == token { + comm.Log("token empty") + return false + } - if token == config.AdminToken { - return true - } + if token == config.AdminToken { + return true + } - u := prop.GetPropKey("u") - if "" == u { - comm.Log("u empty") - return false - } + u := prop.GetPropKey("u") + if "" == u { + comm.Log("u empty") + return false + } - checkToken := comm.GetMd5(u + config.Key) + checkToken := comm.GetMd5(u + config.Key) - comm.Log("check token:%s checkToken: %s", token, checkToken) - return token == checkToken + comm.Log("check token:%s checkToken: %s", token, checkToken) + return token == checkToken } func (prop *MiopProp) GetFilter() *entity.UserFilter { - filter := prop.GetPropKey("filter") + filter := prop.GetPropKey("filter") - if "" == filter { - return nil - } + if "" == filter { + return nil + } - var userfilter entity.UserFilter + var userfilter entity.UserFilter - err := json.Unmarshal([]byte(filter), &userfilter) + err := json.Unmarshal([]byte(filter), &userfilter) - if nil != err { - return nil - } + if nil != err { + return nil + } - return &userfilter + return &userfilter } func ParseMsgList(msgs [][]byte) [][]byte { - msgpack := make([][]byte, 0) - for _, msg := range msgs { - arrs := bytes.Split(msg, []byte("@$@")) - if len(arrs) > 1 { - id, _ := strconv.Atoi(string(arrs[0])) - msgpack = append(msgpack, MiopDataPack(uint32(id), string(arrs[1]))) - } - } - return msgpack + msgpack := make([][]byte, 0) + for _, msg := range msgs { + arrs := bytes.Split(msg, []byte("@$@")) + if len(arrs) > 1 { + id, _ := strconv.Atoi(string(arrs[0])) + msgpack = append(msgpack, MiopDataPack(uint32(id), string(arrs[1]))) + } + } + return msgpack } diff --git a/push/push.go b/push/push.go index a84fc5b..8c044c6 100644 --- a/push/push.go +++ b/push/push.go @@ -1,20 +1,20 @@ package main import ( - "comm" - "config" - "server" - "signal" + "comm" + "config" + "server" + "signal" ) func main() { - config.Init() - go signal.Start() - go server.BoardCast() - ln := server.Start(config.HostAndPort) - for { - conn, err := ln.Accept() - comm.CheckError(err, "Accept error") - go server.HandlerConn(conn) - } + config.Init() + go signal.Start() + go server.BoardCast() + ln := server.Start(config.HostAndPort) + for { + conn, err := ln.Accept() + comm.CheckError(err, "Accept error") + go server.HandlerConn(conn) + } } diff --git a/server/server.go b/server/server.go index 80d39c9..6ba063e 100644 --- a/server/server.go +++ b/server/server.go @@ -1,16 +1,16 @@ package server import ( - "net" - "comm" - "sync" - "os" - "time" - "config" - "user" - "entity" - "miop" - "bytes" + "bytes" + "comm" + "config" + "entity" + "miop" + "net" + "os" + "sync" + "time" + "user" ) var runing = true @@ -19,108 +19,107 @@ var connCount = 0 var l sync.Mutex func Stop() { - runing = false - comm.Log("server stoping") - - if !sending { - comm.Log("server stop") - os.Exit(-1) - } + runing = false + comm.Log("server stoping") + + if !sending { + comm.Log("server stop") + os.Exit(-1) + } } func IsRuning() bool { - return runing + return runing } func SendStart() { - sending = true - comm.Log("sending....") + sending = true + comm.Log("sending....") } func SendEnd() { - sending = false - comm.Log("send stop....") - - if !runing { - comm.Log("server stop") - os.Exit(-1) - } + sending = false + comm.Log("send stop....") + + if !runing { + comm.Log("server stop") + os.Exit(-1) + } } func Start(hostAndPort string) net.Listener { - listener, err := net.Listen("tcp", hostAndPort) - comm.CheckError(err, "Listen error") - comm.Log("Listening to: %s", listener.Addr().String()) - return listener + listener, err := net.Listen("tcp", hostAndPort) + comm.CheckError(err, "Listen error") + comm.Log("Listening to: %s", listener.Addr().String()) + return listener } func HandlerConn(conn net.Conn) { - defer func(conn net.Conn) { - DelConnCount() - conn.Close() - }(conn) - - if !IsRuning() { - return - } - - c := make(chan []byte, config.BuffLen) - u := &user.User{conn, c, "", nil, time.Now().Unix(), entity.GetEntity()} - go func() { - for { - time.Sleep(3 * time.Second) - if !u.CheckHeartBeat() { - u.Conn.Close() - return - } - } - return - }() - AddConnCount() - for { - suc := u.PutChan() - if false == suc { - break - } - } - - return + defer func(conn net.Conn) { + DelConnCount() + conn.Close() + }(conn) + + if !IsRuning() { + return + } + + c := make(chan []byte, config.BuffLen) + u := &user.User{conn, c, "", nil, time.Now().Unix(), entity.GetEntity()} + go func() { + for { + time.Sleep(3 * time.Second) + if !u.CheckHeartBeat() { + u.Conn.Close() + return + } + } + return + }() + AddConnCount() + for { + suc := u.PutChan() + if false == suc { + break + } + } + + return } func AddConnCount() { - l.Lock() - defer l.Unlock() - connCount++ - user.ConnCount++ + l.Lock() + defer l.Unlock() + connCount++ + user.ConnCount++ } func DelConnCount() { - l.Lock() - defer l.Unlock() - connCount-- - user.ConnCount-- + l.Lock() + defer l.Unlock() + connCount-- + user.ConnCount-- } func GetConnCount() int { - return connCount + return connCount } func BoardCast() { - for { - time.Sleep(3 * time.Second) - sid := comm.GetSendMaxId() - msg := comm.GetMsgList(sid) - if nil == msg { - continue - } - SendStart() - //comm.Log("send boardcast %s", msg) - sendMsg := miop.ParseMsgList(msg) - b := miop.MiopPack("", bytes.Join(sendMsg, []byte(""))) - //comm.Log("packmsg %x len %d", b, len(b)) - user.BoardCast(b) - comm.SetSendMaxId() - SendEnd() - } + for { + time.Sleep(3 * time.Second) + sid := comm.GetSendMaxId() + msg := comm.GetMsgList(sid) + if nil == msg { + continue + } + SendStart() + //comm.Log("send boardcast %s", msg) + sendMsg := miop.ParseMsgList(msg) + b := miop.MiopPack("", bytes.Join(sendMsg, []byte(""))) + //comm.Log("packmsg %x len %d", b, len(b)) + user.BoardCast(b) + comm.SetSendMaxId() + SendEnd() + } } - diff --git a/signal/signal.go b/signal/signal.go index c2e8d56..5f745f4 100644 --- a/signal/signal.go +++ b/signal/signal.go @@ -1,63 +1,62 @@ package signal import ( - "os" - osignal "os/signal" - "syscall" - "server" - "comm" + "comm" + "os" + osignal "os/signal" + "server" + "syscall" ) -type sigHandler func (s os.Signal, arg interface{}) +type sigHandler func(s os.Signal, arg interface{}) type sigSet struct { - m map[os.Signal]sigHandler + m map[os.Signal]sigHandler } func sigInit() *sigSet { - sig := new(sigSet) - sig.m = make(map[os.Signal]sigHandler) - return sig + sig := new(sigSet) + sig.m = make(map[os.Signal]sigHandler) + return sig } func (ss *sigSet) add(s os.Signal, handler sigHandler) { - if _, ok := ss.m[s]; !ok { - comm.Log("add signal %d", s) - ss.m[s] = handler - } + if _, ok := ss.m[s]; !ok { + comm.Log("add signal %d", s) + ss.m[s] = handler + } } func (ss *sigSet) handler(s os.Signal, arg interface{}) bool { - if _, ok := ss.m[s]; ok { - ss.m[s](s, arg) - return true - } - - return false + if _, ok := ss.m[s]; ok { + ss.m[s](s, arg) + return true + } + + return false } func Start() { - comm.Log("signal start") - ss := sigInit() - comm.Log("signal init") - handler := func(s os.Signal, arg interface{}){ - comm.Log("receive sig %d", s) - server.Stop() - } - ss.add(syscall.SIGHUP, handler) - - for { - c := make(chan os.Signal) - var sigs []os.Signal - for sig := range ss.m { - sigs = append(sigs, sig) - } - - osignal.Notify(c) - - sig := <-c - - ss.handler(sig, nil) - } + comm.Log("signal start") + ss := sigInit() + comm.Log("signal init") + handler := func(s os.Signal, arg interface{}) { + comm.Log("receive sig %d", s) + server.Stop() + } + ss.add(syscall.SIGHUP, handler) + + for { + c := make(chan os.Signal) + var sigs []os.Signal + for sig := range ss.m { + sigs = append(sigs, sig) + } + + osignal.Notify(c) + + sig := <-c + + ss.handler(sig, nil) + } } - diff --git a/user/user.go b/user/user.go index e60df35..b5f76b1 100644 --- a/user/user.go +++ b/user/user.go @@ -1,191 +1,191 @@ package user import ( - "comm" - "config" - "entity" - "fmt" - "miop" - "net" - "time" - "strconv" - "bytes" + "bytes" + "comm" + "config" + "entity" + "fmt" + "miop" + "net" + "strconv" + "time" ) var ( - userlist = make([]User, 1000) - ConnCount =0 + userlist = make([]User, 1000) + ConnCount = 0 ) type User struct { - Conn net.Conn - C chan []byte - Uid string - More []byte - LastTime int64 - Ue *entity.UserEntity + Conn net.Conn + C chan []byte + Uid string + More []byte + LastTime int64 + Ue *entity.UserEntity } var UserList = make(map[string]*User) func CheckUserExits(uid string) bool { - _, ok := UserList[uid] - if ok { - return true - } - return false + _, ok := UserList[uid] + if ok { + return true + } + return false } func AddUser(u *User) *User { - if CheckUserExits(u.Uid) { - return nil - } + if CheckUserExits(u.Uid) { + return nil + } - comm.Log("user %s add", u.Uid) + comm.Log("user %s add", u.Uid) - //save to db - - user := entity.GetUser(u.Uid) + //save to db - if nil == user { - now := time.Now().Unix() - entity.AddUser(u.Uid, u.Conn.RemoteAddr().String(), 1000, now, now, now) - } + user := entity.GetUser(u.Uid) - return SetUser(u) + if nil == user { + now := time.Now().Unix() + entity.AddUser(u.Uid, u.Conn.RemoteAddr().String(), 1000, now, now, now) + } + + return SetUser(u) } func SetUser(u *User) *User { - UserList[u.Uid] = u - return u + UserList[u.Uid] = u + return u } func DelUser(uid string) bool { - if CheckUserExits(uid) { - delete(UserList, uid) - return true - } - return false + if CheckUserExits(uid) { + delete(UserList, uid) + return true + } + return false } func GetUser(uid string) *User { - u, ok := UserList[uid] - if ok { - return u - } - return nil + u, ok := UserList[uid] + if ok { + return u + } + return nil } func (u *User) PutChan() bool { - buf := make([]byte, config.BuffLen) - rlen, err := u.Conn.Read(buf) - fmt.Printf("read len: %d", rlen) - if nil == err { - comm.Debug("conn read %x", buf[:rlen]) - if len(buf) > 0 { - u.C <- buf[:rlen] - } - u.UpTime() - return u.OnRecieve() - } else { - comm.Log("close connection: %s", u.Conn.RemoteAddr().String()) - u.Conn.Close() - DelUser(u.Uid) - return false - } + buf := make([]byte, config.BuffLen) + rlen, err := u.Conn.Read(buf) + fmt.Printf("read len: %d", rlen) + if nil == err { + comm.Debug("conn read %x", buf[:rlen]) + if len(buf) > 0 { + u.C <- buf[:rlen] + } + u.UpTime() + return u.OnRecieve() + } else { + comm.Log("close connection: %s", u.Conn.RemoteAddr().String()) + u.Conn.Close() + DelUser(u.Uid) + return false + } } func (u *User) OnRecieve() bool { - select { - case msg := <-u.C: - return u.parseMsg(msg) - default: - return u.CheckHeartBeat() + select { + case msg := <-u.C: + return u.parseMsg(msg) + default: + return u.CheckHeartBeat() - } + } } func (u *User) CheckHeartBeat() bool { - comm.Log("hb check %s", u.Uid) - return u.LastTime+config.HeartbeatTime > time.Now().Unix() + comm.Log("hb check %s", u.Uid) + return u.LastTime+config.HeartbeatTime > time.Now().Unix() } func (u *User) UpTime() { - u.LastTime = time.Now().Unix() + u.LastTime = time.Now().Unix() } func (u *User) Send(msg []byte) { - wrote, err := u.Conn.Write(msg) - comm.CheckError(err, "write: "+string(wrote)+" bytes") - comm.Log("send to %s : %x", u.Uid, msg) + wrote, err := u.Conn.Write(msg) + comm.CheckError(err, "write: "+string(wrote)+" bytes") + comm.Log("send to %s : %x", u.Uid, msg) } func (u *User) SendOfflineMsg(id int) { - msgs := comm.GetMsgList(id) - if len(msgs) > 0 { - comm.Log("uid:%s get %d offline msg", u.Uid, len(msgs)) - msgs = miop.ParseMsgList(msgs) - msg := miop.MiopPack("", bytes.Join(msgs, []byte(""))) - u.Send(msg) - } - return + msgs := comm.GetMsgList(id) + if len(msgs) > 0 { + comm.Log("uid:%s get %d offline msg", u.Uid, len(msgs)) + msgs = miop.ParseMsgList(msgs) + msg := miop.MiopPack("", bytes.Join(msgs, []byte(""))) + u.Send(msg) + } + return } func (u *User) parseMsg(msg []byte) bool { - comm.Log("parsemsg %x len %d", msg, len(msg)) - if len(u.More) > 0 { - //msg = comm.MergeByte(u.More, msg) - msg = append(u.More, msg...) - } - datas, more := miop.MiopUnpack(msg) - if len(more) > 0 { - u.More = more - } - comm.Log("unpack %d", len(datas)) - result := true - for _, data := range datas { - if "" == u.Uid { //first connect - prop := data.ParseProp() - if ok := prop.CheckToken(); !ok { - comm.Log("check token fail") - result = false - continue - } - uname, p := prop.GetUser() - if checkAdmin(uname) { - token := prop.GetPropKey("token") - if token == config.AdminToken { - u.Send(miop.MiopPack("ol:"+strconv.Itoa(ConnCount), []byte(""))) - } - return false - } - if "" != uname { - u.Uid = uname + ":" + p - AddUser(u) - lastId := prop.GetLastId() - u.SendOfflineMsg(lastId) - result = true - continue - } - } else { - u.Send(msg) - } - } - - return result -} - -func checkAdmin(u string) bool{ - return u == config.AdminUid + comm.Log("parsemsg %x len %d", msg, len(msg)) + if len(u.More) > 0 { + //msg = comm.MergeByte(u.More, msg) + msg = append(u.More, msg...) + } + datas, more := miop.MiopUnpack(msg) + if len(more) > 0 { + u.More = more + } + comm.Log("unpack %d", len(datas)) + result := true + for _, data := range datas { + if "" == u.Uid { //first connect + prop := data.ParseProp() + if ok := prop.CheckToken(); !ok { + comm.Log("check token fail") + result = false + continue + } + uname, p := prop.GetUser() + if checkAdmin(uname) { + token := prop.GetPropKey("token") + if token == config.AdminToken { + u.Send(miop.MiopPack("ol:"+strconv.Itoa(ConnCount), []byte(""))) + } + return false + } + if "" != uname { + u.Uid = uname + ":" + p + AddUser(u) + lastId := prop.GetLastId() + u.SendOfflineMsg(lastId) + result = true + continue + } + } else { + u.Send(msg) + } + } + + return result +} + +func checkAdmin(u string) bool { + return u == config.AdminUid } func BoardCast(msg []byte) { - for _, u := range UserList { - comm.Log("send to %s", u.Uid) - u.parseMsg(msg) - } + for _, u := range UserList { + comm.Log("send to %s", u.Uid) + u.parseMsg(msg) + } } func GetConnCount() int { - return ConnCount + return ConnCount }