Skip to content

Commit

Permalink
Merge pull request #8 from yuuki/ipv4-active-open
Browse files Browse the repository at this point in the history
Print both active and passive flows with --ipv4 option
  • Loading branch information
yuuki authored Nov 24, 2019
2 parents 4233fc8 + a341424 commit 8c31e7b
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 146 deletions.
15 changes: 4 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,10 @@ Make ttracer run once.
### ttctl

```shell-session
$ ttctl --level 2 --dest-ipv4 10.0.0.21
10.0.0.21:80
└<-- 10.0.0.22:many ('nginx', pgid=2000, connections=30)
└<-- 10.0.0.23:many ('nginx', pgid=891, connections=30)
└<-- 10.0.0.24:many ('nginx', pgid=1002, connections=30)
└<-- 10.0.0.30:many ('python', pgid=1889 connections=1)
└<-- 10.0.0.31:many ('python', pgid=1998 connections=1)
└<-- 10.0.0.25:many (connections:30)
10.0.0.21:22
└<-- 10.0.0.100:many
$ ttctl --dbhost 10.0.0.20 --ipv4 10.0.0.10 10.0.0.10:80 (’nginx’, pgid=4656)
└<-- 10.0.0.11:many (’wrk’, pgid=5982) 10.0.0.10:80 (’nginx’, pgid=4656)
└--> 10.0.0.12:8080 (’python’, pgid=6111) 10.0.0.10:many (’fluentd’, pgid=2127)
└--> 10.0.0.13:24224 (’fluentd’, pgid=2001)
```

## License
Expand Down
73 changes: 39 additions & 34 deletions cmd/ttctl/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"log"
"net"
"strings"

"github.com/yuuki/transtracer/db"
"github.com/yuuki/transtracer/statik"
Expand Down Expand Up @@ -125,58 +124,64 @@ func (c *CLI) doIPv4(ipv4 string, depth int, opt *db.Opt) int {
return exitCodeErr
}
addr := net.ParseIP(ipv4)
portsbyaddr, err := db.FindListeningPortsByAddrs([]net.IP{addr})

// print thet flows of passive nodes
pflows, err := db.FindPassiveFlows([]net.IP{addr})
if err != nil {
log.Printf("find listening ports by addrs error: %v\n", err)
log.Printf("find active flows error: %v\n", err)
return exitCodeErr
}
for _, addrports := range portsbyaddr {
for _, addrport := range addrports {
fmt.Fprintf(c.outStream, "%s:%d ('%s', pgid=%d)\n", addrport.IPAddr, addrport.Port, addrport.Pname, addrport.Pgid)
if err := c.printIPv4(db, addrport, 1, depth); err != nil {
log.Printf("print dest ipv4 error: %v\n", err)
return exitCodeErr
}
}
for _, flows := range pflows {
pnode := flows[0].PassiveNode
fmt.Fprintf(c.outStream,
"%s:%d ('%s', pgid=%d)\n", pnode.IPAddr, pnode.Port, pnode.Pname, pnode.Pgid)

c.printPassiveFlows(flows)
}

// print the flows of active nodes
aflows, err := db.FindActiveFlows([]net.IP{addr})
if err != nil {
log.Printf("find active flows error: %v\n", err)
return exitCodeErr
}
for _, flows := range aflows {
anode := flows[0].ActiveNode
fmt.Fprintf(c.outStream,
"%s ('%s', pgid=%d)\n", anode.IPAddr, anode.Pname, anode.Pgid)

c.printActiveFlows(flows)
}

return exitCodeOK
}

func (c *CLI) printIPv4(db *db.DB, addrport *db.AddrPort, curDepth, depth int) error {
addrports, err := db.FindSourceByDestAddrAndPort(addrport.IPAddr, addrport.Port)
if err != nil {
return err
}
if len(addrports) == 0 {
return nil
}
indent := strings.Repeat("\t", curDepth-1)
curDepth++
depth--
for _, addrport := range addrports {
fmt.Fprint(c.outStream, indent)
fmt.Fprint(c.outStream, "└<-- ")
fmt.Fprint(c.outStream, addrport)
fmt.Fprintln(c.outStream)
if err := c.printIPv4(db, addrport, curDepth, depth); err != nil {
return err
}
func (c *CLI) printPassiveFlows(flows []*db.Flow) {
// No implementation of printing tree with depth > 1
for _, flow := range flows {
fmt.Fprintf(c.outStream, "└<-- %s\n", flow.ActiveNode)
}
}

func (c *CLI) printActiveFlows(flows []*db.Flow) {
// No implementation of printing tree with depth > 1
for _, flow := range flows {
fmt.Fprintf(c.outStream, "└--> %s\n", flow.PassiveNode)
}
return nil
}

var helpText = `Usage: ttctl [options]
ttctl is a CLI controller for mftracer system.
ttctl is a CLI controller for transtracer.
Options:
--create-schema create mftracer table schema for postgres
--create-schema create transtracer table schema for postgres
--dbuser postgres user
--dbpass postgres user password
--dbhost postgres host
--dbport postgres port
--dbname postgres database name
--dest-ipv4 filter by destination ipv4 address
--ipv4 print trees regarding the ipv4 address as a root node
--version, -v print version
--help, -h print help
`
201 changes: 139 additions & 62 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,114 +309,191 @@ func (db *DB) InsertOrUpdateHostFlows(flows []*tcpflow.HostFlow) error {
return nil
}

// AddrPort are IP addr and port.
type AddrPort struct {
IPAddr net.IP
Port int
Pgid int
Pname string
Connections int
// Node represents a minimum unit of a graph tree.
type Node struct {
IPAddr net.IP
Port int // 0 if active node
Pgid int // Process Group ID (Linux)
Pname string // Process Name (Linux)
}

func (a *AddrPort) String() string {
port := fmt.Sprintf("%d", a.Port)
if a.Port == 0 {
func (n *Node) String() string {
port := fmt.Sprintf("%d", n.Port)
if n.Port == 0 {
port = "many"
}
return fmt.Sprintf("%s:%s ('%s', pgid=%d, connections=%d)", a.IPAddr, port, a.Pname, a.Pgid, a.Connections)
return fmt.Sprintf("%s:%s ('%s', pgid=%d)",
n.IPAddr, port, n.Pname, n.Pgid)
}

// FindListeningPortsByAddrs find listening ports for multiple `addrs`.
func (db *DB) FindListeningPortsByAddrs(addrs []net.IP) (map[string][]*AddrPort, error) {
// Flows represents a flow between a active node and a passive node.
type Flow struct {
ActiveNode *Node
PassiveNode *Node
Connections int
}

// Flows represents a collection of flow.
type Flows map[string][]*Flow // flows group by

// FindPassiveFlows queries passive flows to CMDB by the slice of ipaddrs.
func (db *DB) FindPassiveFlows(addrs []net.IP) (Flows, error) {
ipv4s := make([]string, 0, len(addrs))
for _, addr := range addrs {
ipv4s = append(ipv4s, addr.String())
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rows, err := db.QueryContext(ctx, `
SELECT ipv4, port, pgid, pname FROM passive_nodes
INNER JOIN processes ON processes.process_id = passive_nodes.process_id
WHERE processes.ipv4 = ANY($1)
SELECT
DISTINCT ON (pipv4, pn.pname)
pn.ipv4 AS pipv4,
pn.pname AS ppname,
pn.port AS pport,
pn.pgid AS ppgid,
active_processes.ipv4 AS aipv4,
active_processes.pname AS apname,
active_processes.pgid AS apgid,
connections,
flows.updated AS updated
FROM flows
INNER JOIN active_nodes ON active_nodes.node_id = flows.source_node_id
INNER JOIN processes AS active_processes ON active_nodes.process_id = active_processes.process_id
INNER JOIN (
SELECT passive_nodes.node_id, passive_nodes.port, passive_processes.* FROM passive_nodes
INNER JOIN processes AS passive_processes ON passive_processes.process_id = passive_nodes.process_id
WHERE passive_processes.ipv4 = ANY($1)
) AS pn ON pn.node_id = flows.destination_node_id
ORDER BY pn.ipv4, pn.pname, flows.updated DESC
`, pq.Array(ipv4s))
if err == sql.ErrNoRows {
return map[string][]*AddrPort{}, nil
}
if err != nil {
return nil, xerrors.Errorf("query error: %v", err)
switch {
case err == sql.ErrNoRows:
return Flows{}, nil
case err != nil:
return Flows{}, xerrors.Errorf("find passive flows query error: %v", err)
}
defer rows.Close()

portsbyaddr := make(map[string][]*AddrPort)
flows := make(Flows, 0)
for rows.Next() {
var (
addr string
port int
pgid int
pname string
pipv4 string
ppname string
pport int
ppgid int
aipv4 string
apname string
apgid int
connections int
updated time.Time
)
if err := rows.Scan(&addr, &port, &pgid, &pname); err != nil {
return nil, xerrors.Errorf("query error: %v", err)
}
if port == 0 { // port == 0 means 'many'
continue
if err := rows.Scan(
&pipv4, &ppname, &pport, &ppgid, &aipv4, &apname, &apgid, &connections, &updated,
); err != nil {
return nil, xerrors.Errorf("rows scan error: %v", err)
}
portsbyaddr[addr] = append(portsbyaddr[addr], &AddrPort{
IPAddr: net.ParseIP(addr),
Port: port,
Pgid: pgid,
Pname: pname,
key := fmt.Sprintf("%s-%s", pipv4, ppname)
flows[key] = append(flows[key], &Flow{
ActiveNode: &Node{
IPAddr: net.ParseIP(aipv4),
Port: 0,
Pgid: apgid,
Pname: apname,
},
PassiveNode: &Node{
IPAddr: net.ParseIP(pipv4),
Port: pport,
Pgid: ppgid,
Pname: ppname,
},
Connections: connections,
})
}
return portsbyaddr, nil
if err := rows.Err(); err != nil {
return nil, xerrors.Errorf("rows error: %v", err)
}

return flows, nil
}

// FindSourceByDestAddrAndPort find source nodes.
func (db *DB) FindSourceByDestAddrAndPort(addr net.IP, port int) ([]*AddrPort, error) {
// FindActiveFlows queries active flows to CMDB by the slice of ipaddrs.
func (db *DB) FindActiveFlows(addrs []net.IP) (Flows, error) {
ipv4s := make([]string, 0, len(addrs))
for _, addr := range addrs {
ipv4s = append(ipv4s, addr.String())
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rows, err := db.QueryContext(ctx, `
SELECT
connections, flows.updated AS updated, processes.ipv4 AS source_ipv4, pn.port AS source_port, processes.pgid AS pgid, processes.pname AS pname
DISTINCT ON (aipv4, an.pname)
an.ipv4 AS aipv4,
an.pname AS apname,
passive_nodes.port AS pport,
an.pgid AS apgid,
passive_processes.ipv4 AS pipv4,
passive_processes.pname AS ppname,
passive_processes.pgid AS ppgid,
connections,
flows.updated AS updated
FROM flows
INNER JOIN active_nodes ON active_nodes.node_id = flows.source_node_id
INNER JOIN processes ON processes.process_id = active_nodes.process_id
INNER JOIN (
SELECT passive_nodes.node_id, passive_nodes.port FROM passive_nodes
INNER JOIN processes ON processes.process_id = passive_nodes.process_id
WHERE processes.ipv4 = $1 AND passive_nodes.port = $2
) AS pn ON flows.destination_node_id = pn.node_id
`, addr.String(), port)
INNER JOIN passive_nodes ON passive_nodes.node_id = flows.destination_node_id
INNER JOIN processes AS passive_processes ON passive_nodes.process_id = passive_processes.process_id
INNER JOIN (
SELECT node_id, active_processes.* FROM active_nodes
INNER JOIN processes AS active_processes ON active_processes.process_id = active_nodes.process_id
WHERE active_processes.ipv4 = ANY($1)
) AS an ON an.node_id = flows.source_node_id
ORDER BY an.ipv4, an.pname, flows.updated DESC
`, pq.Array(ipv4s))
switch {
case err == sql.ErrNoRows:
return []*AddrPort{}, nil
return Flows{}, nil
case err != nil:
return []*AddrPort{}, xerrors.Errorf("find source nodes error: %v", err)
return Flows{}, xerrors.Errorf("find active flows query error: %v", err)
}
defer rows.Close()
addrports := make([]*AddrPort, 0)

flows := make(Flows, 0)
for rows.Next() {
var (
aipv4 string
apname string
pport int
apgid int
pipv4 string
ppname string
ppgid int
connections int
updated time.Time
sipv4 string
sport int
spgid int
spname string
)
if err := rows.Scan(&connections, &updated, &sipv4, &sport, &spgid, &spname); err != nil {
if err := rows.Scan(&aipv4, &apname, &pport, &apgid, &pipv4, &ppname, &ppgid, &connections, &updated); err != nil {
return nil, xerrors.Errorf("rows scan error: %v", err)
}
addrports = append(addrports, &AddrPort{
IPAddr: net.ParseIP(sipv4),
Port: sport,
Pgid: spgid,
Pname: spname,
key := fmt.Sprintf("%s-%s", aipv4, apname)
flows[key] = append(flows[key], &Flow{
ActiveNode: &Node{
IPAddr: net.ParseIP(aipv4),
Port: 0,
Pgid: apgid,
Pname: apname,
},
PassiveNode: &Node{
IPAddr: net.ParseIP(pipv4),
Port: pport,
Pgid: ppgid,
Pname: ppname,
},
Connections: connections,
})
}
if err := rows.Err(); err != nil {
return nil, xerrors.Errorf("rows error: %v", err)
}
return addrports, nil

return flows, nil
}
Loading

0 comments on commit 8c31e7b

Please sign in to comment.