diff --git a/pubsub/go.mod b/pubsub/go.mod index 4a917ad57e..2535558683 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -3,29 +3,30 @@ module github.com/GoogleCloudPlatform/golang-samples/pubsub go 1.21 require ( - cloud.google.com/go/bigquery v1.62.0 - cloud.google.com/go/iam v1.2.1 - cloud.google.com/go/pubsub v1.44.0 + cloud.google.com/go/bigquery v1.64.0 + cloud.google.com/go/iam v1.2.2 + cloud.google.com/go/pubsub v1.45.1 + cloud.google.com/go/pubsub/v2 v2.0.0-00010101000000-000000000000 cloud.google.com/go/storage v1.43.0 - cloud.google.com/go/trace v1.11.0 + cloud.google.com/go/trace v1.11.2 github.com/GoogleCloudPlatform/golang-samples v0.0.0-20240820230436-761d0ae7aeff github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.24.1 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/googleapis/gax-go/v2 v2.13.0 + github.com/googleapis/gax-go/v2 v2.14.0 github.com/linkedin/goavro/v2 v2.13.0 go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel/sdk v1.29.0 - google.golang.org/api v0.197.0 - google.golang.org/grpc v1.66.2 - google.golang.org/protobuf v1.34.2 + google.golang.org/api v0.211.0 + google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.35.2 ) require ( - cloud.google.com/go v0.115.1 // indirect - cloud.google.com/go/auth v0.9.3 // indirect - cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect - cloud.google.com/go/compute/metadata v0.5.0 // indirect + cloud.google.com/go v0.116.0 // indirect + cloud.google.com/go/auth v0.12.1 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect + cloud.google.com/go/compute/metadata v0.5.2 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -48,18 +49,20 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect go.opentelemetry.io/otel/trace v1.29.0 // indirect - golang.org/x/crypto v0.27.0 // indirect + golang.org/x/crypto v0.30.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect - golang.org/x/mod v0.18.0 // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect - golang.org/x/time v0.6.0 // indirect - golang.org/x/tools v0.22.0 // indirect - golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/oauth2 v0.24.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.8.0 // indirect + golang.org/x/tools v0.24.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 // indirect ) + +replace cloud.google.com/go/pubsub/v2 => /Users/hongalex/code/cloud/google-cloud-go/pubsub/v2 diff --git a/pubsub/go.sum b/pubsub/go.sum index 2b95e4eb36..7598f39f68 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -1,32 +1,32 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.115.1 h1:Jo0SM9cQnSkYfp44+v+NQXHpcHqlnRJk2qxh6yvxxxQ= -cloud.google.com/go v0.115.1/go.mod h1:DuujITeaufu3gL68/lOFIirVNJwQeyf5UXyi+Wbgknc= -cloud.google.com/go/auth v0.9.3 h1:VOEUIAADkkLtyfr3BLa3R8Ed/j6w1jTBmARx+wb5w5U= -cloud.google.com/go/auth v0.9.3/go.mod h1:7z6VY+7h3KUdRov5F1i8NDP5ZzWKYmEPO842BgCsmTk= -cloud.google.com/go/auth/oauth2adapt v0.2.4 h1:0GWE/FUsXhf6C+jAkWgYm7X9tK8cuEIfy19DBn6B6bY= -cloud.google.com/go/auth/oauth2adapt v0.2.4/go.mod h1:jC/jOpwFP6JBxhB3P5Rr0a9HLMC/Pe3eaL4NmdvqPtc= -cloud.google.com/go/bigquery v1.62.0 h1:SYEA2f7fKqbSRRBHb7g0iHTtZvtPSPYdXfmqsjpsBwo= -cloud.google.com/go/bigquery v1.62.0/go.mod h1:5ee+ZkF1x/ntgCsFQJAQTM3QkAZOecfCmvxhkJsWRSA= -cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY= -cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY= -cloud.google.com/go/datacatalog v1.22.0 h1:7e5/0B2LYbNx0BcUJbiCT8K2wCtcB5993z/v1JeLIdc= -cloud.google.com/go/datacatalog v1.22.0/go.mod h1:4Wff6GphTY6guF5WphrD76jOdfBiflDiRGFAxq7t//I= -cloud.google.com/go/iam v1.2.1 h1:QFct02HRb7H12J/3utj0qf5tobFh9V4vR6h9eX5EBRU= -cloud.google.com/go/iam v1.2.1/go.mod h1:3VUIJDPpwT6p/amXRC5GY8fCCh70lxPygguVtI0Z4/g= -cloud.google.com/go/kms v1.19.1 h1:NPE8zjJuMpECvHsx8lsMwQuWWIdJc6iIDHLJGC/J4bw= -cloud.google.com/go/kms v1.19.1/go.mod h1:GRbd2v6e9rAVs+IwOIuePa3xcCm7/XpGNyWtBwwOdRc= -cloud.google.com/go/logging v1.11.0 h1:v3ktVzXMV7CwHq1MBF65wcqLMA7i+z3YxbUsoK7mOKs= -cloud.google.com/go/logging v1.11.0/go.mod h1:5LDiJC/RxTt+fHc1LAt20R9TKiUTReDg6RuuFOZ67+A= -cloud.google.com/go/longrunning v0.6.0 h1:mM1ZmaNsQsnb+5n1DNPeL0KwQd9jQRqSqSDEkBZr+aI= -cloud.google.com/go/longrunning v0.6.0/go.mod h1:uHzSZqW89h7/pasCWNYdUpwGz3PcVWhrWupreVPYLts= -cloud.google.com/go/monitoring v1.21.0 h1:EMc0tB+d3lUewT2NzKC/hr8cSR9WsUieVywzIHetGro= -cloud.google.com/go/monitoring v1.21.0/go.mod h1:tuJ+KNDdJbetSsbSGTqnaBvbauS5kr3Q/koy3Up6r+4= -cloud.google.com/go/pubsub v1.44.0 h1:pLaMJVDTlnUDIKT5L0k53YyLszfBbGoUBo/IqDK/fEI= -cloud.google.com/go/pubsub v1.44.0/go.mod h1:BD4a/kmE8OePyHoa1qAHEw1rMzXX+Pc8Se54T/8mc3I= +cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= +cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go/auth v0.12.1 h1:n2Bj25BUMM0nvE9D2XLTiImanwZhO3DkfWSYS/SAJP4= +cloud.google.com/go/auth v0.12.1/go.mod h1:BFMu+TNpF3DmvfBO9ClqTR/SiqVIm7LukKF9mbendF4= +cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= +cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= +cloud.google.com/go/bigquery v1.64.0 h1:vSSZisNyhr2ioJE1OuYBQrnrpB7pIhRQm4jfjc7E/js= +cloud.google.com/go/bigquery v1.64.0/go.mod h1:gy8Ooz6HF7QmA+TRtX8tZmXBKH5mCFBwUApGAb3zI7Y= +cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= +cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= +cloud.google.com/go/datacatalog v1.23.0 h1:9F2zIbWNNmtrSkPIyGRQNsIugG5VgVVFip6+tXSdWLg= +cloud.google.com/go/datacatalog v1.23.0/go.mod h1:9Wamq8TDfL2680Sav7q3zEhBJSPBrDxJU8WtPJ25dBM= +cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= +cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= +cloud.google.com/go/kms v1.20.1 h1:og29Wv59uf2FVaZlesaiDAqHFzHaoUyHI3HYp9VUHVg= +cloud.google.com/go/kms v1.20.1/go.mod h1:LywpNiVCvzYNJWS9JUcGJSVTNSwPwi0vBAotzDqn2nc= +cloud.google.com/go/logging v1.12.0 h1:ex1igYcGFd4S/RZWOCU51StlIEuey5bjqwH9ZYjHibk= +cloud.google.com/go/logging v1.12.0/go.mod h1:wwYBt5HlYP1InnrtYI0wtwttpVU1rifnMT7RejksUAM= +cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc= +cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI= +cloud.google.com/go/monitoring v1.21.2 h1:FChwVtClH19E7pJ+e0xUhJPGksctZNVOk2UhMmblmdU= +cloud.google.com/go/monitoring v1.21.2/go.mod h1:hS3pXvaG8KgWTSz+dAdyzPrGUYmi2Q+WFX8g2hqVEZU= +cloud.google.com/go/pubsub v1.45.1 h1:ZC/UzYcrmK12THWn1P72z+Pnp2vu/zCZRXyhAfP1hJY= +cloud.google.com/go/pubsub v1.45.1/go.mod h1:3bn7fTmzZFwaUjllitv1WlsNMkqBgGUb3UdMhI54eCc= cloud.google.com/go/storage v1.43.0 h1:CcxnSohZwizt4LCzQHWvBf1/kvtHUn7gk9QERXPyXFs= cloud.google.com/go/storage v1.43.0/go.mod h1:ajvxEa7WmZS1PxvKRq4bq0tFT3vMd502JwstCcYv0Q0= -cloud.google.com/go/trace v1.11.0 h1:UHX6cOJm45Zw/KIbqHe4kII8PupLt/V5tscZUkeiJVI= -cloud.google.com/go/trace v1.11.0/go.mod h1:Aiemdi52635dBR7o3zuc9lLjXo3BwGaChEjCa3tJNmM= +cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI= +cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/GoogleCloudPlatform/golang-samples v0.0.0-20240820230436-761d0ae7aeff h1:eoQLT2CbHlA5oNrfUnVbRjM2aXp9lHBNhdnCe9oDKj4= github.com/GoogleCloudPlatform/golang-samples v0.0.0-20240820230436-761d0ae7aeff/go.mod h1:zNbBG/YoLJKDB1iQueDUxex/8bI9YqLK3BTh2kMtejI= @@ -95,8 +95,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= -github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDPT0hH1s= -github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A= +github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o= +github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= @@ -139,77 +139,77 @@ go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt3 go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= -golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= -golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= -golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= -golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= -google.golang.org/api v0.197.0 h1:x6CwqQLsFiA5JKAiGyGBjc2bNtHtLddhJCE2IKuhhcQ= -google.golang.org/api v0.197.0/go.mod h1:AuOuo20GoQ331nq7DquGHlU6d+2wN2fZ8O0ta60nRNw= +google.golang.org/api v0.211.0 h1:IUpLjq09jxBSV1lACO33CGY3jsRcbctfGzhj+ZSE/Bg= +google.golang.org/api v0.211.0/go.mod h1:XOloB4MXFH4UTlQSGuNUxw0UT74qdENK8d6JNsXKLi0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 h1:BulPr26Jqjnd4eYDVe+YvyR7Yc2vJGkO5/0UxD0/jZU= -google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:hL97c3SYopEHblzpxRL4lSs523++l8DYxGM1FQiYmb4= -google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc= -google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= +google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 h1:pgr/4QbFyktUv9CtQ/Fq4gzEE6/Xs7iCXbktaGzLHbQ= +google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697/go.mod h1:+D9ySVjN8nY8YCVjc5O7PZDIdZporIDY3KaGfJunh88= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= -google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -219,8 +219,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pubsub/topics/add_users.go b/pubsub/topics/add_users.go index 46bc3226d8..41e2853c7b 100644 --- a/pubsub/topics/add_users.go +++ b/pubsub/topics/add_users.go @@ -18,12 +18,13 @@ package topics import ( "context" "fmt" + "io" - "cloud.google.com/go/iam" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/iam/apiv1/iampb" + "cloud.google.com/go/pubsub/v2" ) -func addUsers(projectID, topicID string) error { +func addUsers(w io.Writer, projectID, topicID string) error { // projectID := "my-project-id" // topicID := "my-topic" ctx := context.Background() @@ -33,21 +34,34 @@ func addUsers(projectID, topicID string) error { } defer client.Close() - topic := client.Topic(topicID) - policy, err := topic.IAM().Policy(ctx) + topicName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID) + req := &iampb.GetIamPolicyRequest{ + Resource: topicName, + } + policy, err := client.TopicAdminClient.GetIamPolicy(ctx, req) if err != nil { - return fmt.Errorf("Policy: %w", err) + return fmt.Errorf("error calling GetIamPolicy: %w", err) + } + b1 := &iampb.Binding{ + Role: "roles/viewer", + Members: []string{"allUsers"}, + } + b2 := &iampb.Binding{ + Role: "roles/editor", + // Other valid prefixes are "serviceAccount:", "user:" + // See the documentation for more values. + Members: []string{"group:cloud-logs@google.com"}, } - // Other valid prefixes are "serviceAccount:", "user:" - // See the documentation for more values. - policy.Add(iam.AllUsers, iam.Viewer) - policy.Add("group:cloud-logs@google.com", iam.Editor) - if err := topic.IAM().SetPolicy(ctx, policy); err != nil { - return fmt.Errorf("SetPolicy: %w", err) + policy.Bindings = append(policy.Bindings, b1, b2) + + setRequest := &iampb.SetIamPolicyRequest{ + Resource: topicName, + Policy: policy, + } + _, err = client.TopicAdminClient.SetIamPolicy(ctx, setRequest) + if err != nil { + return fmt.Errorf("error calling SetIamPolicy: %w", err) } - // NOTE: It may be necessary to retry this operation if IAM policies are - // being modified concurrently. SetPolicy will return an error if the policy - // was modified since it was retrieved. return nil } diff --git a/pubsub/topics/create.go b/pubsub/topics/create.go index cef93ef91f..3fa7c3348d 100644 --- a/pubsub/topics/create.go +++ b/pubsub/topics/create.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func create(w io.Writer, projectID, topicID string) error { @@ -33,7 +34,10 @@ func create(w io.Writer, projectID, topicID string) error { } defer client.Close() - t, err := client.CreateTopic(ctx, topicID) + topic := &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + } + t, err := client.TopicAdminClient.CreateTopic(ctx, topic) if err != nil { return fmt.Errorf("CreateTopic: %w", err) } diff --git a/pubsub/topics/create_topic_gcs_ingestion.go b/pubsub/topics/create_topic_gcs_ingestion.go index 8e8d619ffc..7e92f3b8fe 100644 --- a/pubsub/topics/create_topic_gcs_ingestion.go +++ b/pubsub/topics/create_topic_gcs_ingestion.go @@ -21,15 +21,18 @@ import ( "io" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" + "google.golang.org/protobuf/types/known/timestamppb" ) -func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error { +func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error { // projectID := "my-project-id" // topicID := "my-topic" // bucket := "my-bucket" // matchGlob := "**.txt" // minimumObjectCreateTime := "2006-01-02T15:04:05Z" + // delimiter := "," ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) @@ -43,20 +46,25 @@ func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucke return err } - cfg := &pubsub.TopicConfig{ - IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{ - Source: &pubsub.IngestionDataSourceCloudStorage{ - Bucket: bucket, - // Alternatively, can be Avro or PubSubAvro formats. See - InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{ - Delimiter: ",", + topicpb := &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{ + Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{ + CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{ + Bucket: bucket, + // Alternatively, can be Avro or PubSubAvro formats. See + InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{ + TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{ + Delimiter: &delimiter, + }, + }, + MatchGlob: matchGlob, + MinimumObjectCreateTime: timestamppb.New(minCreateTime), }, - MatchGlob: matchGlob, - MinimumObjectCreateTime: minCreateTime, }, }, } - t, err := client.CreateTopicWithConfig(ctx, topicID, cfg) + t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb) if err != nil { return fmt.Errorf("CreateTopic: %w", err) } diff --git a/pubsub/topics/create_topic_kinesis_ingestion.go b/pubsub/topics/create_topic_kinesis_ingestion.go index 036dccf6c7..75afea0818 100644 --- a/pubsub/topics/create_topic_kinesis_ingestion.go +++ b/pubsub/topics/create_topic_kinesis_ingestion.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error { @@ -38,21 +39,23 @@ func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) err } defer client.Close() - cfg := &pubsub.TopicConfig{ - IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{ - Source: &pubsub.IngestionDataSourceAWSKinesis{ - StreamARN: streamARN, - ConsumerARN: consumerARN, - AWSRoleARN: awsRoleARN, - GCPServiceAccount: gcpServiceAccount, + topicpb := &pubsubpb.Topic{ + IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{ + Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{ + AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{ + StreamArn: streamARN, + ConsumerArn: consumerARN, + AwsRoleArn: awsRoleARN, + GcpServiceAccount: gcpServiceAccount, + }, }, }, } - t, err := client.CreateTopicWithConfig(ctx, topicID, cfg) + topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb) if err != nil { - return fmt.Errorf("CreateTopic: %w", err) + return fmt.Errorf("failed to create topic with kinesis: %w", err) } - fmt.Fprintf(w, "Kinesis topic created: %v\n", t) + fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb) return nil } diff --git a/pubsub/topics/delete.go b/pubsub/topics/delete.go index e92fbfc0e2..316259d846 100644 --- a/pubsub/topics/delete.go +++ b/pubsub/topics/delete.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func delete(w io.Writer, projectID, topicID string) error { @@ -33,11 +34,14 @@ func delete(w io.Writer, projectID, topicID string) error { } defer client.Close() - t := client.Topic(topicID) - if err := t.Delete(ctx); err != nil { - return fmt.Errorf("Delete: %w", err) + req := &pubsubpb.DeleteTopicRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), } - fmt.Fprintf(w, "Deleted topic: %v\n", t) + err = client.TopicAdminClient.DeleteTopic(ctx, req) + if err != nil { + return fmt.Errorf("failed to delete topic: %w", err) + } + fmt.Fprintln(w, "Deleted topic") return nil } diff --git a/pubsub/topics/list_subs.go b/pubsub/topics/list_subs.go index 1eb31b41bf..da1f3d5d73 100644 --- a/pubsub/topics/list_subs.go +++ b/pubsub/topics/list_subs.go @@ -18,35 +18,38 @@ package topics import ( "context" "fmt" + "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "google.golang.org/api/iterator" ) -func listSubscriptions(projectID, topicID string) ([]*pubsub.Subscription, error) { +func listSubscriptions(w io.Writer, projectID, topicID string) error { // projectID := "my-project-id" // topicName := "projects/sample-248520/topics/ocr-go-test-topic" ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { - return nil, fmt.Errorf("pubsub.NewClient: %w", err) + return fmt.Errorf("pubsub.NewClient: %w", err) } defer client.Close() - var subs []*pubsub.Subscription - - it := client.Topic(topicID).Subscriptions(ctx) + req := &pubsubpb.ListTopicSubscriptionsRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + } + it := client.TopicAdminClient.ListTopicSubscriptions(ctx, req) for { sub, err := it.Next() if err == iterator.Done { break } if err != nil { - return nil, fmt.Errorf("Next: %w", err) + return fmt.Errorf("error listing topic subscriptions: %w", err) } - subs = append(subs, sub) + fmt.Fprintf(w, "got subscription: %s\n", sub) } - return subs, nil + return nil } // [END pubsub_list_topic_subscriptions] diff --git a/pubsub/topics/list_topics.go b/pubsub/topics/list_topics.go index 62c9c64d1a..bbfc26ccf8 100644 --- a/pubsub/topics/list_topics.go +++ b/pubsub/topics/list_topics.go @@ -18,35 +18,38 @@ package topics import ( "context" "fmt" + "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "google.golang.org/api/iterator" ) -func list(projectID string) ([]*pubsub.Topic, error) { +func listTopics(w io.Writer, projectID string) error { // projectID := "my-project-id" ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { - return nil, fmt.Errorf("pubsub.NewClient: %w", err) + return fmt.Errorf("pubsub.NewClient: %w", err) } defer client.Close() - var topics []*pubsub.Topic - - it := client.Topics(ctx) + req := &pubsubpb.ListTopicsRequest{ + Project: fmt.Sprintf("projects/%s", projectID), + } + it := client.TopicAdminClient.ListTopics(ctx, req) for { topic, err := it.Next() if err == iterator.Done { break } if err != nil { - return nil, fmt.Errorf("Next: %w", err) + return fmt.Errorf("error listing topics: %w", err) } - topics = append(topics, topic) - } + fmt.Fprintf(w, "got topic: %s\n", topic) - return topics, nil + } + return nil } // [END pubsub_list_topics] diff --git a/pubsub/topics/policy.go b/pubsub/topics/policy.go index fa3c71d098..9c0e8c221f 100644 --- a/pubsub/topics/policy.go +++ b/pubsub/topics/policy.go @@ -20,28 +20,33 @@ import ( "fmt" "io" - "cloud.google.com/go/iam" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/iam/apiv1/iampb" + "cloud.google.com/go/pubsub/v2" ) -func policy(w io.Writer, projectID, topicID string) (*iam.Policy, error) { +func getIAMPolicy(w io.Writer, projectID, topicID string) error { // projectID := "my-project-id" // topicID := "my-topic" ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { - return nil, fmt.Errorf("pubsub.NewClient: %w", err) + return fmt.Errorf("pubsub.NewClient: %w", err) } defer client.Close() - policy, err := client.Topic(topicID).IAM().Policy(ctx) + req := &iampb.GetIamPolicyRequest{ + Resource: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + } + policy, err := client.TopicAdminClient.GetIamPolicy(ctx, req) if err != nil { - return nil, fmt.Errorf("Policy: %w", err) + return fmt.Errorf("Policy: %w", err) } - for _, role := range policy.Roles() { - fmt.Fprint(w, policy.Members(role)) + for _, b := range policy.Bindings { + for _, m := range b.Members { + fmt.Fprintf(w, "role: %s, member: %s\n", b.Role, m) + } } - return policy, nil + return nil } // [END pubsub_get_topic_policy] diff --git a/pubsub/topics/publish.go b/pubsub/topics/publish.go index acc3b5095c..81e78c43c4 100644 --- a/pubsub/topics/publish.go +++ b/pubsub/topics/publish.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publish(w io.Writer, projectID, topicID, msg string) error { @@ -34,8 +34,9 @@ func publish(w io.Writer, projectID, topicID, msg string) error { } defer client.Close() - t := client.Topic(topicID) - result := t.Publish(ctx, &pubsub.Message{ + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + result := p.Publish(ctx, &pubsub.Message{ Data: []byte(msg), }) // Block until the result is returned and a server-generated diff --git a/pubsub/topics/publish_custom.go b/pubsub/topics/publish_custom.go index cbd2334ce4..9b9e18cb4c 100644 --- a/pubsub/topics/publish_custom.go +++ b/pubsub/topics/publish_custom.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishCustomAttributes(w io.Writer, projectID, topicID string) error { @@ -33,8 +33,9 @@ func publishCustomAttributes(w io.Writer, projectID, topicID string) error { } defer client.Close() - t := client.Topic(topicID) - result := t.Publish(ctx, &pubsub.Message{ + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("Hello world!"), Attributes: map[string]string{ "origin": "golang", diff --git a/pubsub/topics/publish_flow_control.go b/pubsub/topics/publish_flow_control.go index 402fec00f1..bfd375230d 100644 --- a/pubsub/topics/publish_flow_control.go +++ b/pubsub/topics/publish_flow_control.go @@ -23,7 +23,7 @@ import ( "sync" "sync/atomic" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error { @@ -36,8 +36,9 @@ func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) erro } defer client.Close() - t := client.Topic(topicID) - t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{ + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + p.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{ MaxOutstandingMessages: 100, // default 1000 MaxOutstandingBytes: 10 * 1024 * 1024, // default 0 (unlimited) LimitExceededBehavior: pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError @@ -50,7 +51,7 @@ func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) erro // Rapidly publishing 1000 messages in a loop may be constrained by flow control. for i := 0; i < numMsgs; i++ { wg.Add(1) - result := t.Publish(ctx, &pubsub.Message{ + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("message #" + strconv.Itoa(i)), }) go func(i int, res *pubsub.PublishResult) { diff --git a/pubsub/topics/publish_ordering.go b/pubsub/topics/publish_ordering.go index 846488cb53..5efdd67f5a 100644 --- a/pubsub/topics/publish_ordering.go +++ b/pubsub/topics/publish_ordering.go @@ -22,7 +22,7 @@ import ( "sync" "sync/atomic" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "google.golang.org/api/option" ) @@ -43,8 +43,10 @@ func publishWithOrderingKey(w io.Writer, projectID, topicID string) { var wg sync.WaitGroup var totalErrors uint64 - t := client.Topic(topicID) - t.EnableMessageOrdering = true + + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + p.EnableMessageOrdering = true messages := []struct { message string @@ -69,7 +71,7 @@ func publishWithOrderingKey(w io.Writer, projectID, topicID string) { } for _, m := range messages { - res := t.Publish(ctx, &pubsub.Message{ + res := p.Publish(ctx, &pubsub.Message{ Data: []byte(m.message), OrderingKey: m.orderingKey, }) diff --git a/pubsub/topics/publish_otel_tracing.go b/pubsub/topics/publish_otel_tracing.go index c18a6e6976..76648bc4a1 100644 --- a/pubsub/topics/publish_otel_tracing.go +++ b/pubsub/topics/publish_otel_tracing.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "go.opentelemetry.io/otel" "google.golang.org/api/option" @@ -73,8 +73,9 @@ func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, samplin } defer client.Close() - t := client.Topic(topicID) - result := t.Publish(ctx, &pubsub.Message{ + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("Publishing message with tracing"), }) if _, err := result.Get(ctx); err != nil { diff --git a/pubsub/topics/publish_resume_ordering.go b/pubsub/topics/publish_resume_ordering.go index f63244141e..7fc1061f70 100644 --- a/pubsub/topics/publish_resume_ordering.go +++ b/pubsub/topics/publish_resume_ordering.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "google.golang.org/api/option" ) @@ -39,11 +39,12 @@ func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) { } defer client.Close() - t := client.Topic(topicID) - t.EnableMessageOrdering = true + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + p.EnableMessageOrdering = true key := "some-ordering-key" - res := t.Publish(ctx, &pubsub.Message{ + res := p.Publish(ctx, &pubsub.Message{ Data: []byte("some-message"), OrderingKey: key, }) @@ -55,7 +56,7 @@ func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) { // Resume publish on an ordering key that has had unrecoverable errors. // After such an error publishes with this ordering key will fail // until this method is called. - t.ResumePublish(key) + p.ResumePublish(key) } fmt.Fprint(w, "Published a message with ordering key successfully\n") diff --git a/pubsub/topics/publish_retry.go b/pubsub/topics/publish_retry.go index 2cb29c346e..0849eb0f85 100644 --- a/pubsub/topics/publish_retry.go +++ b/pubsub/topics/publish_retry.go @@ -21,8 +21,8 @@ import ( "io" "time" - "cloud.google.com/go/pubsub" - vkit "cloud.google.com/go/pubsub/apiv1" + "cloud.google.com/go/pubsub/v2" + vkit "cloud.google.com/go/pubsub/v2/apiv1" gax "github.com/googleapis/gax-go/v2" "google.golang.org/grpc/codes" ) @@ -34,7 +34,7 @@ func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error ctx := context.Background() config := &pubsub.ClientConfig{ - PublisherCallOptions: &vkit.PublisherCallOptions{ + PublisherCallOptions: &vkit.TopicAdminCallOptions{ Publish: []gax.CallOption{ gax.WithRetry(func() gax.Retryer { return gax.OnCodes([]codes.Code{ @@ -61,8 +61,9 @@ func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error } defer client.Close() - t := client.Topic(topicID) - result := t.Publish(ctx, &pubsub.Message{ + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + result := p.Publish(ctx, &pubsub.Message{ Data: []byte(msg), }) // Block until the result is returned and a server-generated diff --git a/pubsub/topics/publish_scale.go b/pubsub/topics/publish_scale.go index 3f66d6d907..bed733fb50 100644 --- a/pubsub/topics/publish_scale.go +++ b/pubsub/topics/publish_scale.go @@ -23,7 +23,7 @@ import ( "sync" "sync/atomic" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishThatScales(w io.Writer, projectID, topicID string, n int) error { @@ -38,10 +38,12 @@ func publishThatScales(w io.Writer, projectID, topicID string, n int) error { var wg sync.WaitGroup var totalErrors uint64 - t := client.Topic(topicID) + + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) for i := 0; i < n; i++ { - result := t.Publish(ctx, &pubsub.Message{ + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("Message " + strconv.Itoa(i)), }) diff --git a/pubsub/topics/publish_settings.go b/pubsub/topics/publish_settings.go index 7684e8d037..28856547aa 100644 --- a/pubsub/topics/publish_settings.go +++ b/pubsub/topics/publish_settings.go @@ -22,7 +22,7 @@ import ( "strconv" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishWithSettings(w io.Writer, projectID, topicID string) error { @@ -34,15 +34,17 @@ func publishWithSettings(w io.Writer, projectID, topicID string) error { return fmt.Errorf("pubsub.NewClient: %w", err) } defer client.Close() + + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + p.PublishSettings.ByteThreshold = 5000 + p.PublishSettings.CountThreshold = 10 + p.PublishSettings.DelayThreshold = 100 * time.Millisecond + var results []*pubsub.PublishResult var resultErrors []error - t := client.Topic(topicID) - t.PublishSettings.ByteThreshold = 5000 - t.PublishSettings.CountThreshold = 10 - t.PublishSettings.DelayThreshold = 100 * time.Millisecond - for i := 0; i < 10; i++ { - result := t.Publish(ctx, &pubsub.Message{ + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("Message " + strconv.Itoa(i)), }) results = append(results, result) diff --git a/pubsub/topics/publish_single.go b/pubsub/topics/publish_single.go index 47bde5e200..b7da6ac33a 100644 --- a/pubsub/topics/publish_single.go +++ b/pubsub/topics/publish_single.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error { @@ -34,10 +34,11 @@ func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error { } defer client.Close() - t := client.Topic(topicID) - t.PublishSettings.NumGoroutines = 1 + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + p.PublishSettings.NumGoroutines = 1 - result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)}) + result := p.Publish(ctx, &pubsub.Message{Data: []byte(msg)}) // Block until the result is returned and a server-generated // ID is returned for the published message. id, err := result.Get(ctx) diff --git a/pubsub/topics/publish_with_compression.go b/pubsub/topics/publish_with_compression.go index df5c2768ed..5e99585cb4 100644 --- a/pubsub/topics/publish_with_compression.go +++ b/pubsub/topics/publish_with_compression.go @@ -20,7 +20,7 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" ) func publishWithCompression(w io.Writer, projectID, topicID string) error { @@ -34,12 +34,14 @@ func publishWithCompression(w io.Writer, projectID, topicID string) error { } defer client.Close() - t := client.Topic(topicID) + // Make sure to reuse this publisher across publishes. + p := client.Publisher(topicID) + // Enable compression and configure the compression threshold to 10 bytes (default to 240 B). // Publish requests of sizes > 10 B (excluding the request headers) will get compressed. - t.PublishSettings.EnableCompression = true - t.PublishSettings.CompressionBytesThreshold = 10 - result := t.Publish(ctx, &pubsub.Message{ + p.PublishSettings.EnableCompression = true + p.PublishSettings.CompressionBytesThreshold = 10 + result := p.Publish(ctx, &pubsub.Message{ Data: []byte("This is a test message"), }) // Block until the result is returned and a server-generated diff --git a/pubsub/topics/test_permissions.go b/pubsub/topics/test_permissions.go index 9353713b83..857f9a69a4 100644 --- a/pubsub/topics/test_permissions.go +++ b/pubsub/topics/test_permissions.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/iam/apiv1/iampb" + "cloud.google.com/go/pubsub/v2" ) func testPermissions(w io.Writer, projectID, topicID string) ([]string, error) { @@ -32,18 +33,21 @@ func testPermissions(w io.Writer, projectID, topicID string) ([]string, error) { return nil, fmt.Errorf("pubsub.NewClient: %w", err) } - topic := client.Topic(topicID) - perms, err := topic.IAM().TestPermissions(ctx, []string{ - "pubsub.topics.publish", - "pubsub.topics.update", - }) + req := &iampb.TestIamPermissionsRequest{ + Resource: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + Permissions: []string{ + "pubsub.topics.publish", + "pubsub.topics.update", + }, + } + resp, err := client.TopicAdminClient.TestIamPermissions(ctx, req) if err != nil { - return nil, fmt.Errorf("TestPermissions: %w", err) + return nil, fmt.Errorf("error calling TestIamPermissions: %w", err) } - for _, perm := range perms { + for _, perm := range resp.Permissions { fmt.Fprintf(w, "Allowed: %v\n", perm) } - return perms, nil + return resp.Permissions, nil } // [END pubsub_test_topic_permissions] diff --git a/pubsub/topics/topics_test.go b/pubsub/topics/topics_test.go index a462cfbe9c..5049c5ee36 100644 --- a/pubsub/topics/topics_test.go +++ b/pubsub/topics/topics_test.go @@ -20,16 +20,16 @@ import ( "bytes" "context" "fmt" - "io/ioutil" + "io" "strconv" "strings" "sync" "testing" "time" - "cloud.google.com/go/iam" - "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" trace "cloud.google.com/go/trace/apiv1" "cloud.google.com/go/trace/apiv1/tracepb" "github.com/GoogleCloudPlatform/golang-samples/internal/testutil" @@ -37,6 +37,7 @@ import ( ) var topicID string +var topicName string const ( topicPrefix = "topic" @@ -59,9 +60,12 @@ func setup(t *testing.T) *pubsub.Client { once.Do(func() { topicID = fmt.Sprintf("%s-%d", topicPrefix, time.Now().UnixNano()) + topicName = fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID) // Cleanup resources from previous tests. - it := client.Topics(ctx) + it := client.TopicAdminClient.ListTopics(ctx, &pubsubpb.ListTopicsRequest{ + Project: tc.ProjectID, + }) for { t, err := it.Next() if err == iterator.Done { @@ -70,7 +74,8 @@ func setup(t *testing.T) *pubsub.Client { if err != nil { return } - tID := t.ID() + tt := strings.Split(t.GetName(), "/") + tID := tt[len(tt)-1] p := strings.Split(tID, "-") // Only delete resources created from these tests. @@ -82,7 +87,10 @@ func setup(t *testing.T) *pubsub.Client { } timeTCreated := time.Unix(0, timestamp) if time.Since(timeTCreated) > expireAge { - if err := t.Delete(ctx); err != nil { + req := &pubsubpb.DeleteTopicRequest{ + Topic: t.GetName(), + } + if err := client.TopicAdminClient.DeleteTopic(ctx, req); err != nil { fmt.Printf("Delete topic err: %v: %v", t.String(), err) } } @@ -94,41 +102,20 @@ func setup(t *testing.T) *pubsub.Client { } func TestCreate(t *testing.T) { - client := setup(t) tc := testutil.SystemTest(t) buf := new(bytes.Buffer) if err := create(buf, tc.ProjectID, topicID); err != nil { t.Fatalf("failed to create a topic: %v", err) } - ok, err := client.Topic(topicID).Exists(context.Background()) - if err != nil { - t.Fatalf("failed to check if topic exists: %v", err) - } - if !ok { - t.Fatalf("got none; want topic = %q", topicID) - } } func TestList(t *testing.T) { tc := testutil.SystemTest(t) testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - topics, err := list(tc.ProjectID) - if err != nil { + if err := listTopics(io.Discard, tc.ProjectID); err != nil { r.Errorf("failed to list topics: %v", err) } - - for _, t := range topics { - if t.ID() == topicID { - return // PASS - } - } - - topicIDs := make([]string, len(topics)) - for i, t := range topics { - topicIDs[i] = t.ID() - } - r.Errorf("got %+v; want a list with topic = %q", topicIDs, topicID) }) } @@ -138,7 +125,7 @@ func TestPublish(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publish(buf, tc.ProjectID, topicID, "hello world"); err != nil { t.Errorf("failed to publish message: %v", err) @@ -149,7 +136,7 @@ func TestPublishThatScales(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publishThatScales(buf, tc.ProjectID, topicID, 10); err != nil { t.Errorf("failed to publish message: %v", err) @@ -160,8 +147,8 @@ func TestPublishWithSettings(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) - if err := publishWithSettings(ioutil.Discard, tc.ProjectID, topicID); err != nil { + createTopic(ctx, client, topicName) + if err := publishWithSettings(io.Discard, tc.ProjectID, topicID); err != nil { t.Errorf("failed to publish message: %v", err) } } @@ -170,7 +157,7 @@ func TestPublishCustomAttributes(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publishCustomAttributes(buf, tc.ProjectID, topicID); err != nil { t.Errorf("failed to publish message: %v", err) @@ -181,7 +168,7 @@ func TestPublishWithRetrySettings(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publishWithRetrySettings(buf, tc.ProjectID, topicID, "hello world"); err != nil { t.Errorf("failed to publish message: %v", err) @@ -192,7 +179,7 @@ func TestIAM(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) testutil.Retry(t, 10, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) @@ -206,22 +193,25 @@ func TestIAM(t *testing.T) { }) testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - if err := addUsers(tc.ProjectID, topicID); err != nil { + buf := new(bytes.Buffer) + if err := addUsers(buf, tc.ProjectID, topicID); err != nil { r.Errorf("addUsers: %v", err) } }) testutil.Retry(t, 10, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - policy, err := policy(buf, tc.ProjectID, topicID) - if err != nil { - r.Errorf("policy: %v", err) + if err := getIAMPolicy(buf, tc.ProjectID, topicID); err != nil { + r.Errorf("getIAMPolicy: %v", err) } - if role, member := iam.Editor, "group:cloud-logs@google.com"; !policy.HasRole(member, role) { - r.Errorf("want %q as viewer, policy=%v", member, policy) + got := buf.String() + + if !strings.Contains(got, "role: roles/editor, member: group:cloud-logs@google.com") { + r.Errorf("want %s as editor", "group:cloud-logs@google.com") } - if role, member := iam.Viewer, iam.AllUsers; !policy.HasRole(member, role) { - r.Errorf("want %q as viewer, policy=%v", member, policy) + + if !strings.Contains(got, "role: roles/viewer, member: allUsers") { + r.Errorf("want %s as viewer", "allUsers") } }) } @@ -230,7 +220,7 @@ func TestPublishWithOrderingKey(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) publishWithOrderingKey(buf, tc.ProjectID, topicID) @@ -245,7 +235,7 @@ func TestResumePublishWithOrderingKey(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) resumePublishWithOrderingKey(buf, tc.ProjectID, topicID) @@ -260,7 +250,7 @@ func TestPublishWithFlowControl(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publishWithFlowControlSettings(buf, tc.ProjectID, topicID); err != nil { t.Errorf("failed to publish message: %v", err) @@ -268,33 +258,12 @@ func TestPublishWithFlowControl(t *testing.T) { } func TestDelete(t *testing.T) { - ctx := context.Background() tc := testutil.SystemTest(t) - client := setup(t) - - topic := client.Topic(topicID) - ok, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("failed to check if topic exists: %v", err) - } - if !ok { - _, err := client.CreateTopic(ctx, topicID) - if err != nil { - t.Fatalf("CreateTopic: %v", err) - } - } buf := new(bytes.Buffer) if err := delete(buf, tc.ProjectID, topicID); err != nil { t.Fatalf("failed to delete topic (%q): %v", topicID, err) } - ok, err = client.Topic(topicID).Exists(context.Background()) - if err != nil { - t.Fatalf("failed to check if topic exists: %v", err) - } - if ok { - t.Fatalf("got topic = %q; want none", topicID) - } } func TestTopicKinesisIngestion(t *testing.T) { @@ -324,7 +293,7 @@ func TestTopicCloudStorageIngestion(t *testing.T) { t.Setenv("PUBSUB_EMULATOR_HOST", srv.Addr) // Test creating a cloud storage ingestion topic with Text input format. - if err := createTopicWithCloudStorageIngestion(buf, tc.ProjectID, topicID, "fake-bucket", "**.txt", "2006-01-02T15:04:05Z"); err != nil { + if err := createTopicWithCloudStorageIngestion(buf, tc.ProjectID, topicID, "fake-bucket", "**.txt", "2006-01-02T15:04:05Z", ","); err != nil { t.Fatalf("failed to create a topic with cloud storage ingestion: %v", err) } } @@ -389,9 +358,19 @@ func TestPublishWithCompression(t *testing.T) { ctx := context.Background() tc := testutil.SystemTest(t) client := setup(t) - client.CreateTopic(ctx, topicID) + createTopic(ctx, client, topicName) buf := new(bytes.Buffer) if err := publishWithCompression(buf, tc.ProjectID, topicID); err != nil { t.Errorf("failed to publish message: %v", err) } } + +func createTopic(ctx context.Context, client *pubsub.Client, topicName string) error { + _, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{ + Name: topicName, + }) + if err != nil { + return fmt.Errorf("failed to create topic: %w", err) + } + return nil +} diff --git a/pubsub/topics/update_topic_type.go b/pubsub/topics/update_topic_type.go index 1185d02654..09c23ae876 100644 --- a/pubsub/topics/update_topic_type.go +++ b/pubsub/topics/update_topic_type.go @@ -20,7 +20,9 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) func updateTopicType(w io.Writer, projectID, topicID string) error { @@ -38,18 +40,24 @@ func updateTopicType(w io.Writer, projectID, topicID string) error { } defer client.Close() - updateCfg := pubsub.TopicConfigToUpdate{ - // If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{} - IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{ - Source: &pubsub.IngestionDataSourceAWSKinesis{ - StreamARN: streamARN, - ConsumerARN: consumerARN, - AWSRoleARN: awsRoleARN, - GCPServiceAccount: gcpServiceAccount, + updateReq := &pubsubpb.UpdateTopicRequest{ + Topic: &pubsubpb.Topic{ + IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{ + Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{ + AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{ + StreamArn: streamARN, + ConsumerArn: consumerARN, + AwsRoleArn: awsRoleARN, + GcpServiceAccount: gcpServiceAccount, + }, + }, }, }, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"ingestion_data_source_settings"}, + }, } - topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg) + topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq) if err != nil { return fmt.Errorf("topic.Update: %w", err) }