Skip to content

Commit

Permalink
Fix networkpolicy related antctl commands
Browse files Browse the repository at this point in the history
This commit contains numerous fixes for antctl, including the get ovsflows
and get networkpolicy commands:
1. Update the `antctl get networkpolicy` flag type=ANP to point the correct
   AdminNetworkPolicy type since ANP<->AntreaNetworkPolicy mapping has been
   deprecated.
2. Update the `antctl get ovsflows` command so that when dumping flows for
   a specific policy, a type is now required to eliminate ambiguity.
3. Since v1.13 the `antctl get ovsflows` command will not work properly for
   dumping networkpolicy flows. This is due to the matcher for dumping flows
   include priority, which is not supported by openvswitch. This PR fixes
   this issue.

Signed-off-by: Dyanngg <[email protected]>
  • Loading branch information
Dyanngg committed Jun 26, 2024
1 parent 538df0e commit 81da3d3
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 76 deletions.
16 changes: 2 additions & 14 deletions pkg/agent/apiserver/handlers/networkpolicy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
}
}

// From user shorthand input to cpv1beta1.NetworkPolicyType
var mapToNetworkPolicyType = map[string]cpv1beta.NetworkPolicyType{
"NP": cpv1beta.K8sNetworkPolicy,
"K8SNP": cpv1beta.K8sNetworkPolicy,
"ACNP": cpv1beta.AntreaClusterNetworkPolicy,
"ANNP": cpv1beta.AntreaNetworkPolicy,
"ANP": cpv1beta.AntreaNetworkPolicy,
}

// Create a Network Policy Filter from URL Query
func newFilterFromURLQuery(query url.Values) (*querier.NetworkPolicyQueryFilter, string, error) {
namespace, pod := query.Get("namespace"), query.Get("pod")
Expand All @@ -76,11 +67,8 @@ func newFilterFromURLQuery(query url.Values) (*querier.NetworkPolicyQueryFilter,
return nil, "", fmt.Errorf("namespace option should not be used with pod option")
}
}
strSourceType := strings.ToUpper(query.Get("type"))
npSourceType, ok := mapToNetworkPolicyType[strSourceType]
if strSourceType != "" && !ok {
return nil, "", fmt.Errorf("invalid policy source type. Valid values are K8sNP, ACNP, ANNP and ANP (deprecated)")
}
strSourceType := query.Get("type")
npSourceType := querier.NetworkPolicyTypeMap[strSourceType]
source := query.Get("source")
name := query.Get("name")
if name != "" && (source != "" || namespace != "" || pod != "" || strSourceType != "") {
Expand Down
42 changes: 29 additions & 13 deletions pkg/agent/apiserver/handlers/ovsflows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"antrea.io/antrea/pkg/agent/apis"
"antrea.io/antrea/pkg/agent/openflow"
agentquerier "antrea.io/antrea/pkg/agent/querier"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/querier"
)
Expand All @@ -38,7 +39,7 @@ var (
)

func dumpMatchedFlows(aq agentquerier.AgentQuerier, flowKeys []string) ([]apis.OVSFlowResponse, error) {
resps := []apis.OVSFlowResponse{}
var resps []apis.OVSFlowResponse
for _, f := range flowKeys {
flowStr, err := aq.GetOVSCtlClient().DumpMatchedFlow(f)
if err != nil {
Expand All @@ -53,7 +54,7 @@ func dumpMatchedFlows(aq agentquerier.AgentQuerier, flowKeys []string) ([]apis.O
}

func dumpFlows(aq agentquerier.AgentQuerier, table uint8) ([]apis.OVSFlowResponse, error) {
resps := []apis.OVSFlowResponse{}
var resps []apis.OVSFlowResponse
var flowStrs []string
var err error
if table != binding.TableIDAll {
Expand Down Expand Up @@ -170,19 +171,23 @@ func getServiceFlows(aq agentquerier.AgentQuerier, serviceName, namespace string
return append(resps, groupResps...), nil
}

func getNetworkPolicyFlows(aq agentquerier.AgentQuerier, npName, namespace string) ([]apis.OVSFlowResponse, error) {
if len(aq.GetNetworkPolicyInfoQuerier().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: npName, Namespace: namespace})) == 0 {
func getNetworkPolicyFlows(aq agentquerier.AgentQuerier, npName, namespace string, policyType cpv1beta.NetworkPolicyType) ([]apis.OVSFlowResponse, error) {
npFilter := &querier.NetworkPolicyQueryFilter{
SourceName: npName,
Namespace: namespace,
SourceType: policyType,
}
if len(aq.GetNetworkPolicyInfoQuerier().GetNetworkPolicies(npFilter)) == 0 {
// NetworkPolicy not found.
return nil, nil
}

flowKeys := aq.GetOpenflowClient().GetNetworkPolicyFlowKeys(npName, namespace)
flowKeys := aq.GetOpenflowClient().GetNetworkPolicyFlowKeys(npName, namespace, policyType)
return dumpMatchedFlows(aq, flowKeys)
}

func getTableNames(aq agentquerier.AgentQuerier) []apis.OVSFlowResponse {
resps := []apis.OVSFlowResponse{}
names := []string{}
func getTableNames() []apis.OVSFlowResponse {
var resps []apis.OVSFlowResponse
var names []string
for _, t := range getFlowTableList() {
names = append(names, t.GetName())
}
Expand All @@ -201,6 +206,7 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
pod := r.URL.Query().Get("pod")
service := r.URL.Query().Get("service")
networkPolicy := r.URL.Query().Get("networkpolicy")
policyType := r.URL.Query().Get("type")
namespace := r.URL.Query().Get("namespace")
table := r.URL.Query().Get("table")
groups := r.URL.Query().Get("groups")
Expand All @@ -214,16 +220,25 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
}

if tableNamesOnly {
resps = getTableNames(aq)
resps = getTableNames()
encodeResp()
return
}

if (pod != "" || service != "" || networkPolicy != "") && namespace == "" {
if (pod != "" || service != "") && namespace == "" {
http.Error(w, "namespace must be provided", http.StatusBadRequest)
return
}

if networkPolicy != "" {
if policyType == "" {
http.Error(w, "policy type must be provided with policy name", http.StatusBadRequest)
return
}
if querier.NamespaceScopedPolicyTypes.Has(policyType) && namespace == "" {
http.Error(w, "policy Namespace must be provided for policy type "+policyType, http.StatusBadRequest)
return
}
}
if pod == "" && service == "" && networkPolicy == "" && namespace == "" && table == "" && groups == "" {
resps, err = dumpFlows(aq, binding.TableIDAll)
} else if pod != "" {
Expand All @@ -236,7 +251,8 @@ func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
}
resps, err = getServiceFlows(aq, service, namespace)
} else if networkPolicy != "" {
resps, err = getNetworkPolicyFlows(aq, networkPolicy, namespace)
cpPolicyType := querier.NetworkPolicyTypeMap[policyType]
resps, err = getNetworkPolicyFlows(aq, networkPolicy, namespace, cpPolicyType)
} else if table != "" {
resps, err = getTableFlows(aq, table)
if err == nil && resps == nil {
Expand Down
68 changes: 53 additions & 15 deletions pkg/agent/apiserver/handlers/ovsflows/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type testCase struct {
test string
name string
namespace string
policyType cpv1beta.NetworkPolicyType
query string
expectedStatus int
resps []apis.OVSFlowResponse
Expand Down Expand Up @@ -177,35 +178,72 @@ func TestNetworkPolicyFlows(t *testing.T) {
test: "Existing NetworkPolicy",
name: "np1",
namespace: "ns1",
query: "?networkpolicy=np1&&namespace=ns1",
policyType: cpv1beta.K8sNetworkPolicy,
query: "?networkpolicy=np1&namespace=ns1&type=K8sNP",
expectedStatus: http.StatusOK,
},
{
test: "Non-existing NetworkPolicy",
name: "np2",
namespace: "ns2",
query: "?networkpolicy=np2&&namespace=ns2",
policyType: cpv1beta.K8sNetworkPolicy,
query: "?networkpolicy=np2&namespace=ns2&type=K8sNP",
expectedStatus: http.StatusNotFound,
},
{
test: "Existing ACNP",
name: "acnp1",
policyType: cpv1beta.AntreaClusterNetworkPolicy,
query: "?networkpolicy=acnp1&type=ACNP",
expectedStatus: http.StatusOK,
},
{
test: "Existing ANNP",
name: "annp1",
namespace: "default",
policyType: cpv1beta.AntreaNetworkPolicy,
query: "?networkpolicy=annp1&namespace=default&type=ANNP",
expectedStatus: http.StatusOK,
},
{
test: "ANNP bad request",
name: "annp2",
policyType: cpv1beta.AntreaNetworkPolicy,
query: "?networkpolicy=annp2&type=ANNP",
expectedStatus: http.StatusBadRequest,
},
{
test: "Existing ANP",
name: "anp1",
policyType: cpv1beta.AdminNetworkPolicy,
query: "?networkpolicy=anp1&type=ANP",
expectedStatus: http.StatusOK,
},
}
for i := range testcases {
tc := testcases[i]
npq := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
q := aqtest.NewMockAgentQuerier(ctrl)
q.EXPECT().GetNetworkPolicyInfoQuerier().Return(npq).Times(1)

if tc.expectedStatus != http.StatusNotFound {
ofc := oftest.NewMockClient(ctrl)
ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl)
npq.EXPECT().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: tc.name, Namespace: tc.namespace}).Return([]cpv1beta.NetworkPolicy{*testNetworkPolicy}).Times(1)
ofc.EXPECT().GetNetworkPolicyFlowKeys(tc.name, tc.namespace).Return(testFlowKeys).Times(1)
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys))
for i := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1)
npFilter := &querier.NetworkPolicyQueryFilter{
SourceName: tc.name,
Namespace: tc.namespace,
SourceType: tc.policyType,
}
if tc.expectedStatus != http.StatusBadRequest {
q.EXPECT().GetNetworkPolicyInfoQuerier().Return(npq).Times(1)
if tc.expectedStatus == http.StatusOK {
ofc := oftest.NewMockClient(ctrl)
ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl)
npq.EXPECT().GetNetworkPolicies(npFilter).Return([]cpv1beta.NetworkPolicy{*testNetworkPolicy}).Times(1)
ofc.EXPECT().GetNetworkPolicyFlowKeys(tc.name, tc.namespace, tc.policyType).Return(testFlowKeys).Times(1)
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys))
for i := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpFlows[i], nil).Times(1)
}
} else {
npq.EXPECT().GetNetworkPolicies(npFilter).Return(nil).Times(1)
}
} else {
npq.EXPECT().GetNetworkPolicies(&querier.NetworkPolicyQueryFilter{SourceName: tc.name, Namespace: tc.namespace}).Return(nil).Times(1)
}

runHTTPTest(t, &tc, q)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ type Client interface {
// flows for a NetworkPolicy. Flows are grouped by policy rules, and duplicated
// entries can be added due to conjunctive match flows shared by multiple
// rules.
GetNetworkPolicyFlowKeys(npName, npNamespace string) []string
GetNetworkPolicyFlowKeys(npName, npNamespace string, npType v1beta2.NetworkPolicyType) []string

// ReassignFlowPriorities takes a list of priority updates, and update the actionFlows to replace
// the old priority with the desired one, for each priority update on that table.
Expand Down Expand Up @@ -445,7 +445,7 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
for _, flow := range flows {
msg := getFlowModMessage(flow, binding.AddMessage)
allMessages = append(allMessages, msg)
fCache[getFlowKey(msg)] = msg
fCache[getFlowModKey(msg)] = msg
}
flowCacheMap[flowCacheKey] = fCache
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow
for _, flow := range flows {
msg := getFlowModMessage(flow, binding.AddMessage)
messages = append(messages, msg)
fCache[getFlowKey(msg)] = msg
fCache[getFlowModKey(msg)] = msg
}
err = c.ofEntryOperations.AddAll(messages)
} else {
Expand Down Expand Up @@ -695,7 +695,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, getFlowKey(flow))
flowKeys = append(flowKeys, getFlowModKey(flow))
}
return flowKeys
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/agent/openflow/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,13 +1515,12 @@ func (c *policyRuleConjunction) calculateChangesForRuleDeletion() []*conjMatchFl
return ctxChanges
}

// getAllFlowKeys returns the matching strings of actions flows of
// policyRuleConjunction, as well as matching flows of all its clauses.
// getAllFlowKeys returns the match strings used in ovs-ofctl dump-flows command, including
// actions flows of policyRuleConjunction, as well as matching flows of all its clauses.
func (c *policyRuleConjunction) getAllFlowKeys() []string {
flowKeys := []string{}
dropFlowKeys := []string{}
var flowKeys, dropFlowKeys []string
for _, flow := range c.actionFlows {
flowKeys = append(flowKeys, getFlowKey(flow))
flowKeys = append(flowKeys, getFlowDumpKey(flow))
}

addClauseFlowKeys := func(clause *clause) {
Expand All @@ -1530,10 +1529,10 @@ func (c *policyRuleConjunction) getAllFlowKeys() []string {
}
for _, ctx := range clause.matches {
if ctx.flow != nil {
flowKeys = append(flowKeys, getFlowKey(ctx.flow))
flowKeys = append(flowKeys, getFlowDumpKey(ctx.flow))
}
if ctx.dropFlow != nil {
dropFlowKeys = append(dropFlowKeys, getFlowKey(ctx.dropFlow))
dropFlowKeys = append(dropFlowKeys, getFlowDumpKey(ctx.dropFlow))
}
}
}
Expand Down Expand Up @@ -1710,8 +1709,8 @@ func (c *client) DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressTy
return c.featureNetworkPolicy.applyConjunctiveMatchFlows(changes)
}

func (c *client) GetNetworkPolicyFlowKeys(npName, npNamespace string) []string {
flowKeys := []string{}
func (c *client) GetNetworkPolicyFlowKeys(npName, npNamespace string, npType v1beta2.NetworkPolicyType) []string {
var flowKeys []string
// Hold replayMutex write lock to protect flows from being modified by
// NetworkPolicy updates and replayFlows. This is more for logic
// cleanliness, as: for now flow updates do not impact the matching string
Expand All @@ -1727,7 +1726,7 @@ func (c *client) GetNetworkPolicyFlowKeys(npName, npNamespace string) []string {
if conj.npRef == nil {
continue
}
if conj.npRef.Name == npName && conj.npRef.Namespace == npNamespace {
if conj.npRef.Name == npName && conj.npRef.Namespace == npNamespace && conj.npRef.Type == npType {
// There can be duplicated flows added due to conjunctive matches
// shared by multiple policy rules (clauses).
flowKeys = append(flowKeys, conj.getAllFlowKeys()...)
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/openflow/network_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ func TestInstallPolicyRuleFlows(t *testing.T) {
err = c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges2)
require.Nil(t, err)

assert.Equal(t, 0, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 0, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))
err = c.InstallPolicyRuleFlows(rule2)
require.Nil(t, err)
checkConjunctionConfig(t, ruleID2, 1, 2, 1, 0)
assert.Equal(t, 6, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 6, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))

ruleID3 := uint32(103)
port1 := intstr.FromInt(8080)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestInstallPolicyRuleFlows(t *testing.T) {
err = c.InstallPolicyRuleFlows(rule3)
require.Nil(t, err, "Failed to invoke InstallPolicyRuleFlows")
checkConjunctionConfig(t, ruleID3, 1, 2, 1, 3)
assert.Equal(t, 15, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 15, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))

ctxChanges4 := conj.calculateChangesForRuleDeletion()
matchFlows4, dropFlows4 := getChangedFlows(ctxChanges4)
Expand All @@ -305,7 +305,7 @@ func TestInstallPolicyRuleFlows(t *testing.T) {
assert.Equal(t, 2, getChangedFlowOPCount(matchFlows5, deletion))
assert.Equal(t, 1, getChangedFlowOPCount(matchFlows5, modification))
err = c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges5)
assert.Equal(t, 12, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 12, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))
require.Nil(t, err)
}

Expand Down Expand Up @@ -741,11 +741,11 @@ func TestInstallPolicyRuleFlowsInDualStackCluster(t *testing.T) {
err = c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges2)
require.Nil(t, err)

assert.Equal(t, 0, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 0, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))
err = c.InstallPolicyRuleFlows(rule2)
require.Nil(t, err)
checkConjunctionConfig(t, ruleID2, 2, 3, 1, 0)
assert.Equal(t, 9, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 9, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))

ruleID3 := uint32(103)
port1 := intstr.FromInt(8080)
Expand Down Expand Up @@ -788,7 +788,7 @@ func TestInstallPolicyRuleFlowsInDualStackCluster(t *testing.T) {
err = c.InstallPolicyRuleFlows(rule3)
require.Nil(t, err, "Failed to invoke InstallPolicyRuleFlows")
checkConjunctionConfig(t, ruleID3, 2, 2, 1, 4)
assert.Equal(t, 20, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 20, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))

ctxChanges4 := conj.calculateChangesForRuleDeletion()
matchFlows4, dropFlows4 := getChangedFlows(ctxChanges4)
Expand All @@ -805,7 +805,7 @@ func TestInstallPolicyRuleFlowsInDualStackCluster(t *testing.T) {
assert.Equal(t, 3, getChangedFlowOPCount(matchFlows5, deletion))
assert.Equal(t, 1, getChangedFlowOPCount(matchFlows5, modification))
err = c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges5)
assert.Equal(t, 15, len(c.GetNetworkPolicyFlowKeys("np1", "ns1")))
assert.Equal(t, 15, len(c.GetNetworkPolicyFlowKeys("np1", "ns1", v1beta2.K8sNetworkPolicy)))
require.Nil(t, err)
}

Expand Down
Loading

0 comments on commit 81da3d3

Please sign in to comment.