[perf] sqltracestore use new schema.
- Update the version of CockroachDB used on the bots.
- Moved to using the pgx/v4 client library.
-
Change-Id: Iea9c24201b2599fc24c6f8bba26b7863256badab
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/306733
Reviewed-by: Joe Gregorio <jcgregorio@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/go.mod b/go.mod
index f11cbe9..333dc4e 100644
--- a/go.mod
+++ b/go.mod
@@ -91,6 +91,8 @@
github.com/hashicorp/golang-lru v0.5.4
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.10 // indirect
+ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
+ github.com/jackc/pgx/v4 v4.8.1
github.com/jcgregorio/logger v0.1.2
github.com/jcgregorio/slog v0.0.0-20190423190439-e6f2d537f900
github.com/julienschmidt/httprouter v1.3.0 // indirect
diff --git a/go.sum b/go.sum
index edc684e..38a7756 100644
--- a/go.sum
+++ b/go.sum
@@ -119,6 +119,8 @@
github.com/Jeffail/gabs/v2 v2.5.0/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
+github.com/Kount/pq-timeouts v1.0.0 h1:6a23dhwmQ2PukftCWm56T4RPJ4zc2iE9y5E42TMAl6E=
+github.com/Kount/pq-timeouts v1.0.0/go.mod h1:Y7rNVWI9KiI3xj1QxBmOSB12Eyv9g5Gjego8KFpV5PY=
github.com/Masterminds/goutils v1.1.0 h1:zukEsf/1JZwCMgHiK3GZftabmxiCw4apj3a28RPBiVg=
github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
@@ -250,6 +252,7 @@
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cockroach-go v0.0.0-20190925194419-606b3d062051 h1:eApuUG8W2EtBVwxqLlY2wgoqDYOg3WvIHGvW4fUbbow=
github.com/cockroachdb/cockroach-go v0.0.0-20190925194419-606b3d062051/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
@@ -353,6 +356,9 @@
github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0=
github.com/godbus/dbus v0.0.0-20181101234600-2ff6f7ffd60f h1:zlOR3rOlPAVvtfuxGKoghCmop5B0TRyu/ZieziZuGiM=
github.com/godbus/dbus v0.0.0-20181101234600-2ff6f7ffd60f/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
+github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84=
+github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -551,11 +557,18 @@
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
+github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
+github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA=
github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE=
github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s=
github.com/jackc/pgconn v1.3.2 h1:9UIGICxEAW70RQDGilGwsCG63NCcm5amjuBQCFzrmsw=
github.com/jackc/pgconn v1.3.2/go.mod h1:LvCquS3HbBKwgl7KbX9KyqEIumJAbm1UMcTvGaIf3bM=
+github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk=
+github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
+github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
+github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8=
+github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA=
@@ -570,14 +583,36 @@
github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.1 h1:Rdjp4NFjwHnEslx2b66FfCI2S0LhO4itac3hXz6WX9M=
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
+github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY=
+github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
+github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
+github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
+github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
+github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59 h1:xOamcCJ9MFJTxR5bvw3ZXmiP8evQMohdt2VJ57C0W8Q=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
+github.com/jackc/pgtype v1.2.0/go.mod h1:5m2OfMh1wTK7x+Fk952IDmI4nw3nPrvtQdM0ZT4WpC0=
+github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkALtxZMCH411K+tKzNpwzCKU+AnPzBKZ+I+Po=
+github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ=
+github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY=
+github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig=
+github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
+github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y=
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
+github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186 h1:ZQM8qLT/E/CGD6XX0E6q9FAwxJYmWpJufzmLMaFuzgQ=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
+github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXgo+kA=
+github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o=
+github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg=
+github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw=
+github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
+github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
+github.com/jackc/puddle v1.1.1 h1:PJAw7H/9hoWC4Kf3J8iNmL1SwA6E8vfsLqBiL+F6CtI=
+github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jcgregorio/logger v0.1.2 h1:kHiF857oOObzlUer5ANZ95U08A7k2INjivnss4IyMCg=
@@ -656,9 +691,14 @@
github.com/maruel/ut v1.0.1/go.mod h1:RV8PwPD9dd2KFlnlCc/DB2JVvkXmyaalfc5xvmSrRSs=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
+github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
+github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
+github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
+github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
+github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
@@ -839,13 +879,16 @@
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
+github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
+github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
+github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
@@ -1048,7 +1091,10 @@
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
+go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -1064,6 +1110,7 @@
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191117063200-497ca9f6d64f/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -1071,6 +1118,7 @@
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200420201142-3c4aac89819a h1:y6sBfNd1b9Wy08a6K1Z1DZc4aXABUN5TKjkYhz7UKmo=
golang.org/x/crypto v0.0.0-20200420201142-3c4aac89819a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 h1:IaQbIIB2X/Mp/DKctl6ROxz1KyMlKp4uyvL6+kQ7C88=
@@ -1241,6 +1289,7 @@
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -1251,6 +1300,7 @@
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 h1:gZpLHxUX5BdYLA08Lj4YCJNN/jk7KtquiArPoeX0WvA=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -1345,6 +1395,8 @@
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191010075000-0337d82405ff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
diff --git a/go/cipd/asset_versions_gen.go b/go/cipd/asset_versions_gen.go
index 969c18a..2df0aa6 100644
--- a/go/cipd/asset_versions_gen.go
+++ b/go/cipd/asset_versions_gen.go
@@ -86,7 +86,7 @@
"skia/bots/cockroachdb": {
Path: "cockroachdb",
Name: "skia/bots/cockroachdb",
- Version: "version:2",
+ Version: "version:3",
},
"skia/bots/gcloud_linux": {
Path: "gcloud_linux",
diff --git a/infra/bots/assets/cockroachdb/VERSION b/infra/bots/assets/cockroachdb/VERSION
index 0cfbf08..00750ed 100644
--- a/infra/bots/assets/cockroachdb/VERSION
+++ b/infra/bots/assets/cockroachdb/VERSION
@@ -1 +1 @@
-2
+3
diff --git a/infra/bots/tasks.json b/infra/bots/tasks.json
index f03d0ca..997d52e 100755
--- a/infra/bots/tasks.json
+++ b/infra/bots/tasks.json
@@ -940,7 +940,7 @@
{
"name": "skia/bots/cockroachdb",
"path": "cockroachdb",
- "version": "version:2"
+ "version": "version:3"
},
{
"name": "skia/bots/gcloud_linux",
@@ -1367,7 +1367,7 @@
{
"name": "skia/bots/cockroachdb",
"path": "cockroachdb",
- "version": "version:2"
+ "version": "version:3"
},
{
"name": "skia/bots/gcloud_linux",
diff --git a/perf/COCKROACHDB.md b/perf/COCKROACHDB.md
index 0234b69..6e8cb3a 100644
--- a/perf/COCKROACHDB.md
+++ b/perf/COCKROACHDB.md
@@ -52,7 +52,7 @@
Now port-forwarding the database port for the cockroachdb instance, for example:
- kubectl port-forward perf-skia-cockroachdb-0 26257
+ kubectl port-forward perf-cockroachdb-0 26257
Then run the migrations:
diff --git a/perf/go/alerts/sqlalertstore/sqlalertstore.go b/perf/go/alerts/sqlalertstore/sqlalertstore.go
index 2f256cd..6b96941 100644
--- a/perf/go/alerts/sqlalertstore/sqlalertstore.go
+++ b/perf/go/alerts/sqlalertstore/sqlalertstore.go
@@ -5,11 +5,11 @@
import (
"context"
- "database/sql"
"encoding/json"
"fmt"
"time"
+ "github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/perf/go/alerts"
perfsql "go.skia.org/infra/perf/go/sql"
@@ -27,25 +27,21 @@
listAllAlerts
)
-// statements allows looking up raw SQL statements by their statement id.
-type statements map[statement]string
-
-// statementsByDialect holds all the raw SQL statemens used per Dialect of SQL.
-var statementsByDialect = map[perfsql.Dialect]statements{
- perfsql.CockroachDBDialect: {
- insertAlert: `
+// statements holds all the raw SQL statements used.
+var statements = map[statement]string{
+ insertAlert: `
INSERT INTO
Alerts (alert, last_modified)
VALUES
($1, $2)
`,
- updateAlert: `
+ updateAlert: `
UPSERT INTO
Alerts (id, alert, config_state, last_modified)
VALUES
($1, $2, $3, $4)
`,
- deleteAlert: `
+ deleteAlert: `
UPDATE
Alerts
SET
@@ -54,7 +50,7 @@
WHERE
id=$2
`,
- listActiveAlerts: `
+ listActiveAlerts: `
SELECT
id, alert
FROM
@@ -62,37 +58,28 @@
WHERE
config_state=0 -- alerts.ACTIVE
`,
- listAllAlerts: `
+ listAllAlerts: `
SELECT
id, alert
FROM
ALERTS
`,
- },
}
// SQLAlertStore implements the alerts.Store interface.
type SQLAlertStore struct {
- // preparedStatements are all the prepared SQL statements.
- preparedStatements map[statement]*sql.Stmt
+ // db is the database interface.
+ db *pgxpool.Pool
}
// New returns a new *SQLAlertStore.
//
// We presume all migrations have been run against db before this function is
// called.
-func New(db *sql.DB, dialect perfsql.Dialect) (*SQLAlertStore, error) {
- preparedStatements := map[statement]*sql.Stmt{}
- for key, statement := range statementsByDialect[dialect] {
- prepared, err := db.Prepare(statement)
- if err != nil {
- return nil, skerr.Wrapf(err, "Failed to prepare statment %v %q", key, statement)
- }
- preparedStatements[key] = prepared
- }
+func New(db *pgxpool.Pool, dialect perfsql.Dialect) (*SQLAlertStore, error) {
return &SQLAlertStore{
- preparedStatements: preparedStatements,
+ db: db,
}, nil
}
@@ -106,11 +93,11 @@
now := time.Now().Unix()
if cfg.ID == alerts.BadAlertID {
// Not a valid ID, so this should be an insert, not an update.
- if _, err := s.preparedStatements[insertAlert].ExecContext(ctx, string(b), now); err != nil {
+ if _, err := s.db.Exec(ctx, statements[insertAlert], string(b), now); err != nil {
return skerr.Wrapf(err, "Failed to insert alert")
}
} else {
- if _, err := s.preparedStatements[updateAlert].ExecContext(ctx, cfg.ID, string(b), cfg.StateToInt(), now); err != nil {
+ if _, err := s.db.Exec(ctx, statements[updateAlert], cfg.ID, string(b), cfg.StateToInt(), now); err != nil {
return skerr.Wrapf(err, "Failed to update Alert with ID=%d", cfg.ID)
}
}
@@ -120,7 +107,7 @@
// Delete implements the alerts.Store interface.
func (s *SQLAlertStore) Delete(ctx context.Context, id int) error {
now := time.Now().Unix()
- if _, err := s.preparedStatements[deleteAlert].ExecContext(ctx, now, id); err != nil {
+ if _, err := s.db.Exec(ctx, statements[deleteAlert], now, id); err != nil {
return skerr.Wrapf(err, "Failed to mark Alert as deleted with ID=%d", id)
}
return nil
@@ -132,7 +119,7 @@
if includeDeleted {
stmt = listAllAlerts
}
- rows, err := s.preparedStatements[stmt].QueryContext(ctx)
+ rows, err := s.db.Query(ctx, statements[stmt])
if err != nil {
return nil, err
}
diff --git a/perf/go/builders/builders.go b/perf/go/builders/builders.go
index f0e06c8..bb2c5fc 100644
--- a/perf/go/builders/builders.go
+++ b/perf/go/builders/builders.go
@@ -11,6 +11,8 @@
"cloud.google.com/go/bigtable"
"cloud.google.com/go/datastore"
+ "github.com/jackc/pgx/v4/pgxpool"
+ _ "github.com/jackc/pgx/v4/stdlib" // pgx Go sql
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/skerr"
@@ -41,7 +43,7 @@
// newCockroachDBFromConfig opens an existing CockroachDB database with all
// migrations applied.
-func newCockroachDBFromConfig(instanceConfig *config.InstanceConfig) (*sql.DB, error) {
+func newCockroachDBFromConfig(ctx context.Context, instanceConfig *config.InstanceConfig) (*pgxpool.Pool, error) {
// Note that the migrationsConnection is different from the sql.Open
// connection string since migrations know about CockroachDB, but we use the
// Postgres driver for the database/sql connection since there's no native
@@ -50,7 +52,7 @@
// uses.
migrationsConnection := strings.Replace(instanceConfig.DataStoreConfig.ConnectionString, "postgresql://", "cockroach://", 1)
- db, err := sql.Open("postgres", instanceConfig.DataStoreConfig.ConnectionString)
+ db, err := sql.Open("pgx", instanceConfig.DataStoreConfig.ConnectionString)
if err != nil {
return nil, skerr.Wrap(err)
}
@@ -62,8 +64,12 @@
if err != nil {
return nil, skerr.Wrap(err)
}
+ if err := db.Close(); err != nil {
+ return nil, skerr.Wrap(err)
+ }
sklog.Infof("Finished applying migrations.")
- return db, nil
+
+ return pgxpool.Connect(ctx, instanceConfig.DataStoreConfig.ConnectionString)
}
// NewPerfGitFromConfig return a new perfgit.Git for the given instanceConfig.
@@ -96,7 +102,7 @@
sklog.Infof("Constructing perfgit with dialect: %q and connection_string: %q", dialect, instanceConfig.DataStoreConfig.ConnectionString)
// Now create the appropriate db.
- db, err := newCockroachDBFromConfig(instanceConfig)
+ db, err := newCockroachDBFromConfig(ctx, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
@@ -124,11 +130,11 @@
}
return traceStore, nil
case config.CockroachDBDataStoreType:
- db, err := newCockroachDBFromConfig(instanceConfig)
+ db, err := newCockroachDBFromConfig(ctx, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
- return sqltracestore.New(db, perfsql.CockroachDBDialect, instanceConfig.DataStoreConfig)
+ return sqltracestore.New(db, instanceConfig.DataStoreConfig)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
@@ -161,7 +167,7 @@
}
return dsalertstore.New(), nil
case config.CockroachDBDataStoreType:
- db, err := newCockroachDBFromConfig(instanceConfig)
+ db, err := newCockroachDBFromConfig(ctx, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
@@ -190,11 +196,11 @@
}
return dsregressionstore.NewRegressionStoreDS(lookup), nil
case config.CockroachDBDataStoreType:
- db, err := newCockroachDBFromConfig(instanceConfig)
+ db, err := newCockroachDBFromConfig(ctx, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
- return sqlregressionstore.New(db, perfsql.CockroachDBDialect)
+ return sqlregressionstore.New(db)
}
return nil, skerr.Fmt("Unknown datastore type: %q", instanceConfig.DataStoreConfig.DataStoreType)
}
@@ -210,7 +216,7 @@
return dsshortcutstore.New(), nil
case config.CockroachDBDataStoreType:
- db, err := newCockroachDBFromConfig(instanceConfig)
+ db, err := newCockroachDBFromConfig(ctx, instanceConfig)
if err != nil {
return nil, skerr.Wrap(err)
}
diff --git a/perf/go/builders/builders_test.go b/perf/go/builders/builders_test.go
index c3ceac7..f718f7d 100644
--- a/perf/go/builders/builders_test.go
+++ b/perf/go/builders/builders_test.go
@@ -220,7 +220,7 @@
func TestNewPerfGitFromConfig_ErrIfConnectionStringNotSet(t *testing.T) {
unittest.LargeTest(t)
- ctx, _, _, _, _, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, _, _, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
instanceConfig.DataStoreConfig.DataStoreType = config.GCPDataStoreType
@@ -232,7 +232,7 @@
func TestNewPerfGitFromConfig_GCP_ErrorsOnNonPostgresConnectionString(t *testing.T) {
unittest.LargeTest(t)
- ctx, _, _, _, _, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, _, _, _, _, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
instanceConfig.DataStoreConfig.DataStoreType = config.GCPDataStoreType
@@ -252,11 +252,11 @@
func TestNewPerfGitFromConfig_GCP_SuccessIfConnectionStringIsCockroachDB(t *testing.T) {
unittest.LargeTest(t)
- ctx, _, _, hashes, _, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, _, _, hashes, _, instanceConfig, dbName, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
instanceConfig.DataStoreConfig.DataStoreType = config.GCPDataStoreType
- instanceConfig.DataStoreConfig.ConnectionString = fmt.Sprintf("postgresql://root@%s/%s?sslmode=disable", perfsql.GetCockroachDBEmulatorHost(), gittest.CockroachDatabaseName)
+ instanceConfig.DataStoreConfig.ConnectionString = fmt.Sprintf("postgresql://root@%s/%s?sslmode=disable", perfsql.GetCockroachDBEmulatorHost(), dbName)
g, err := NewPerfGitFromConfig(ctx, false, instanceConfig)
require.NoError(t, err)
@@ -268,7 +268,7 @@
func TestNewPerfGitFromConfig_CockroachDB_Success(t *testing.T) {
unittest.LargeTest(t)
- ctx, _, _, hashes, _, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, _, _, hashes, _, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
instanceConfig.DataStoreConfig.DataStoreType = config.CockroachDBDataStoreType
diff --git a/perf/go/cid/cid_test.go b/perf/go/cid/cid_test.go
index 0fa0b81..4ed62a1 100644
--- a/perf/go/cid/cid_test.go
+++ b/perf/go/cid/cid_test.go
@@ -121,7 +121,7 @@
func TestCommitIDLookup_Success(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, hashes, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, hashes, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
@@ -161,7 +161,7 @@
func TestCommitIDLookup_ErrOnBadCommit(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/config/config.go b/perf/go/config/config.go
index eabead4..7c9a936 100644
--- a/perf/go/config/config.go
+++ b/perf/go/config/config.go
@@ -36,24 +36,6 @@
CockroachDBDataStoreType DataStoreType = "cockroachdb"
)
-// Cache is the configuration for the LRU cache used by the storage client.
-type Cache struct {
- // MemcachedServers is a list of memcached server names and addresses, e.g.
- // ['memcached-0:11211', 'memcached-1:11211', 'memcached-2:11211']
- //
- // If this list is empty then the server will fall back to using an
- // in-memory LRU cache per instance.
- MemcachedServers []string `json:"memcached_servers"`
-
- // Namespace is the string to add to each key to avoid conflicts with more
- // than one application or application instance using the same memcached
- // server.
- Namespace string `json:"namespace"`
-
- // Size is the size of the LRU cache to use if memcached isn't being used.
- Size int `json:"size"`
-}
-
// DataStoreConfig is the configuration for how Perf stores data.
type DataStoreConfig struct {
// DataStoreType determines what type of datastore to build. This value will
@@ -97,9 +79,6 @@
// regressions, and shortcuts should use. This value is only used for 'gcp'
// datastore types.
Namespace string `json:"namespace"`
-
- // Cache is the configuration for the LRU cache used by the storage client.
- Cache Cache `json:"cache"`
}
// SourceType determines what type of file.Source to build from a SourceConfig.
@@ -271,9 +250,10 @@
// IngestFlags are the command-line flags for the ingestion process.
type IngestFlags struct {
- InstanceConfigFile string
- PromPort string
- Local bool
+ InstanceConfigFile string
+ PromPort string
+ Local bool
+ NumParallelIngesters int
}
// Register the flags in the given FlagSet.
@@ -281,6 +261,7 @@
fs.StringVar(&flags.InstanceConfigFile, "config_filename", "", "Instance config file. Must be supplied.")
fs.StringVar(&flags.PromPort, "prom_port", ":20000", "Metrics service address (e.g., ':20000')")
fs.BoolVar(&flags.Local, "local", false, "True if running locally and not in production.")
+ fs.IntVar(&flags.NumParallelIngesters, "num_parallel_ingesters", 10, "The number of parallel Go routines to have ingesting.")
}
// InstanceConfig contains all the info needed by btts.BigTableTraceStore.
diff --git a/perf/go/dataframe/async_test.go b/perf/go/dataframe/async_test.go
index 7c8b98f..87aff52 100644
--- a/perf/go/dataframe/async_test.go
+++ b/perf/go/dataframe/async_test.go
@@ -148,7 +148,7 @@
func TestGetSkps_Success(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
@@ -170,7 +170,7 @@
func TestGetSkps_SuccessIfFileChangeMarkerNotSet(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
@@ -192,7 +192,7 @@
func TestGetSkps_ErrOnBadCommitNumber(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/dataframe/dataframe_test.go b/perf/go/dataframe/dataframe_test.go
index 27f4139..0b63e13 100644
--- a/perf/go/dataframe/dataframe_test.go
+++ b/perf/go/dataframe/dataframe_test.go
@@ -134,7 +134,7 @@
func TestFromTimeRange_Success(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
@@ -156,7 +156,7 @@
func TestFromTimeRange_EmptySlicesIfNothingInTimeRange(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/dfbuilder/dfbuilder_test.go b/perf/go/dfbuilder/dfbuilder_test.go
index 625ad46..64aa91d 100644
--- a/perf/go/dfbuilder/dfbuilder_test.go
+++ b/perf/go/dfbuilder/dfbuilder_test.go
@@ -2,6 +2,8 @@
import (
"context"
+ "fmt"
+ "math/rand"
"net/url"
"testing"
"time"
@@ -34,15 +36,14 @@
}
)
-const CockroachDatabaseName = "dfbuilder"
-
func TestBuildTraceMapper(t *testing.T) {
unittest.LargeTest(t)
- db, cleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName, sqltest.ApplyMigrations)
+ dbName := fmt.Sprintf("dfbuilder%d", rand.Uint32())
+ db, cleanup := sqltest.NewCockroachDBForTests(t, dbName, sqltest.ApplyMigrations)
defer cleanup()
- store, err := sqltracestore.New(db, perfsql.CockroachDBDialect, cfg.DataStoreConfig)
+ store, err := sqltracestore.New(db, cfg.DataStoreConfig)
require.NoError(t, err)
tileMap := buildTileMapOffsetToIndex([]types.CommitNumber{0, 1, 255, 256, 257}, store)
@@ -75,14 +76,14 @@
unittest.LargeTest(t)
ctx := context.Background()
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
instanceConfig.DataStoreConfig.TileSize = 6
- store, err := sqltracestore.New(db, perfsql.CockroachDBDialect, instanceConfig.DataStoreConfig)
+ store, err := sqltracestore.New(db, instanceConfig.DataStoreConfig)
require.NoError(t, err)
builder := NewDataFrameBuilderFromTraceStore(g, store)
@@ -113,7 +114,7 @@
now := gittest.StartTime.Add(7 * time.Minute)
df, err := builder.NewFromQueryAndRange(ctx, now.Add(-7*time.Minute), now.Add(time.Second), q, false, nil)
- assert.NoError(t, err)
+ require.NoError(t, err)
assert.Len(t, df.TraceSet, 2)
assert.Len(t, df.Header, 8)
assert.Len(t, df.TraceSet[",arch=x86,config=8888,"], 8)
@@ -206,7 +207,7 @@
func TestFromIndexRange_Success(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
@@ -232,7 +233,7 @@
func TestFromIndexRange_EmptySliceOnBadCommitNumber(t *testing.T) {
unittest.LargeTest(t)
- ctx, db, _, _, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
defer cleanup()
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/git/git.go b/perf/go/git/git.go
index 96764aa..6c3767d 100644
--- a/perf/go/git/git.go
+++ b/perf/go/git/git.go
@@ -8,7 +8,6 @@
import (
"bufio"
"context"
- "database/sql"
"io"
"os"
"os/exec"
@@ -17,6 +16,8 @@
"strings"
"time"
+ "github.com/jackc/pgx/v4"
+ "github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/gitauth"
"go.skia.org/infra/go/metrics2"
@@ -49,13 +50,10 @@
}
)
-// statements allows looking up raw SQL statements by their statement id.
-type statements map[statement]string
+// statements holds all the raw SQL statemens used per Dialect of SQL.
+var statements = map[statement]string{
-// statementsByDialect holds all the raw SQL statemens used per Dialect of SQL.
-var statementsByDialect = map[perfsql.Dialect]statements{
- perfsql.CockroachDBDialect: {
- getMostRecentGitHashAndCommitNumber: `
+ getMostRecentGitHashAndCommitNumber: `
SELECT
git_hash, commit_number
FROM
@@ -65,7 +63,7 @@
LIMIT
1
`,
- insert: `
+ insert: `
INSERT INTO
Commits (commit_number, git_hash, commit_time, author, subject)
VALUES
@@ -73,14 +71,14 @@
ON CONFLICT
DO NOTHING
`,
- getCommitNumberFromGitHash: `
+ getCommitNumberFromGitHash: `
SELECT
commit_number
FROM
Commits
WHERE
git_hash=$1`,
- getCommitNumberFromTime: `
+ getCommitNumberFromTime: `
SELECT
commit_number
FROM
@@ -92,7 +90,7 @@
LIMIT
1
`,
- getCommitsFromTimeRange: `
+ getCommitsFromTimeRange: `
SELECT
commit_number, git_hash, commit_time, author, subject
FROM
@@ -103,7 +101,7 @@
ORDER BY
commit_number ASC
`,
- getCommitsFromCommitNumberRange: `
+ getCommitsFromCommitNumberRange: `
SELECT
commit_number, git_hash, commit_time, author, subject
FROM
@@ -114,7 +112,7 @@
ORDER BY
commit_number ASC
`,
- getCommitFromCommitNumber: `
+ getCommitFromCommitNumber: `
SELECT
commit_number, git_hash, commit_time, author, subject
FROM
@@ -122,7 +120,7 @@
WHERE
commit_number = $1
`,
- getHashFromCommitNumber: `
+ getHashFromCommitNumber: `
SELECT
git_hash
FROM
@@ -130,7 +128,6 @@
WHERE
commit_number=$1
`,
- },
}
// Git implements the minimal functionality Perf needs to interface to Git.
@@ -146,8 +143,7 @@
instanceConfig *config.InstanceConfig
- // preparedStatements are all the prepared SQL statements.
- preparedStatements map[statement]*sql.Stmt
+ db *pgxpool.Pool
// Metrics
updateCalled metrics2.Counter
@@ -164,7 +160,7 @@
//
// The instance created does not poll by default, callers need to call
// StartBackgroundPolling().
-func New(ctx context.Context, local bool, db *sql.DB, dialect perfsql.Dialect, instanceConfig *config.InstanceConfig) (*Git, error) {
+func New(ctx context.Context, local bool, db *pgxpool.Pool, dialect perfsql.Dialect, instanceConfig *config.InstanceConfig) (*Git, error) {
// Do git authentication if required.
if instanceConfig.GitRepoConfig.GitAuthType == config.GitAuthGerrit {
sklog.Info("Authenticating to Gerrit.")
@@ -199,28 +195,17 @@
}
}
- // Prepare all our SQL statements.
- sklog.Infof("Preparing statements.")
- preparedStatements := map[statement]*sql.Stmt{}
- for key, statement := range statementsByDialect[dialect] {
- prepared, err := db.Prepare(statement)
- if err != nil {
- return nil, skerr.Wrapf(err, "Failed to prepare statment %v %q", key, statement)
- }
- preparedStatements[key] = prepared
- }
-
ret := &Git{
- gitFullPath: gitFullPath,
- preparedStatements: preparedStatements,
- instanceConfig: instanceConfig,
- updateCalled: metrics2.GetCounter("perf_git_update_called"),
- commitNumberFromGitHashCalled: metrics2.GetCounter("perf_git_commit_number_from_githash_called"),
- commitNumberFromTimeCalled: metrics2.GetCounter("perf_git_commit_number_from_time_called"),
- commitSliceFromTimeRangeCalled: metrics2.GetCounter("perf_git_commits_slice_from_time_range_called"),
- commitSliceFromCommitNumberRangeCalled: metrics2.GetCounter("perf_git_commits_slice_from_commit_number_range_called"),
- commitFromCommitNumberCalled: metrics2.GetCounter("perf_git_commit_from_commit_number_called"),
- gitHashFromCommitNumberCalled: metrics2.GetCounter("perf_git_githash_from_commit_number_called"),
+ gitFullPath: gitFullPath,
+ db: db,
+ instanceConfig: instanceConfig,
+ updateCalled: metrics2.GetCounter("perf_git_update_called"),
+ commitNumberFromGitHashCalled: metrics2.GetCounter("perf_git_commit_number_from_githash_called"),
+ commitNumberFromTimeCalled: metrics2.GetCounter("perf_git_commit_number_from_time_called"),
+ commitSliceFromTimeRangeCalled: metrics2.GetCounter("perf_git_commits_slice_from_time_range_called"),
+ commitSliceFromCommitNumberRangeCalled: metrics2.GetCounter("perf_git_commits_slice_from_commit_number_range_called"),
+ commitFromCommitNumberCalled: metrics2.GetCounter("perf_git_commit_from_commit_number_called"),
+ gitHashFromCommitNumberCalled: metrics2.GetCounter("perf_git_githash_from_commit_number_called"),
commitNumbersWhenFileChangesInCommitNumberRangeCalled: metrics2.GetCounter("perf_git_commit_numbers_when_file_changes_in_commit_number_range_called"),
}
@@ -362,7 +347,7 @@
if err != nil {
// If the Commits table is empty then start populating it from the very
// first commit to the repo.
- if err == sql.ErrNoRows {
+ if err == pgx.ErrNoRows {
cmd = exec.CommandContext(ctx, g.gitFullPath, "rev-list", "HEAD", `--pretty=%aN <%aE>%n%s%n%ct`, "--reverse")
nextCommitNumber = types.CommitNumber(0)
} else {
@@ -386,7 +371,7 @@
total := 0
err = parseGitRevLogStream(stdout, func(p Commit) error {
// Add p to the database starting at nextCommitNumber.
- _, err := g.preparedStatements[insert].ExecContext(ctx, nextCommitNumber, p.GitHash, p.Timestamp, p.Author, p.Subject)
+ _, err := g.db.Exec(ctx, statements[insert], nextCommitNumber, p.GitHash, p.Timestamp, p.Author, p.Subject)
if err != nil {
return skerr.Wrapf(err, "Failed to insert commit %q into database.", p.GitHash)
}
@@ -415,7 +400,7 @@
func (g *Git) getMostRecentCommit(ctx context.Context) (string, types.CommitNumber, error) {
var gitHash string
var commitNumber types.CommitNumber
- if err := g.preparedStatements[getMostRecentGitHashAndCommitNumber].QueryRowContext(ctx).Scan(&gitHash, &commitNumber); err != nil {
+ if err := g.db.QueryRow(ctx, statements[getMostRecentGitHashAndCommitNumber]).Scan(&gitHash, &commitNumber); err != nil {
// Don't wrap the err, we need to see if it's sql.ErrNoRows.
return "", types.BadCommitNumber, err
}
@@ -426,7 +411,7 @@
func (g *Git) CommitNumberFromGitHash(ctx context.Context, githash string) (types.CommitNumber, error) {
g.commitNumberFromGitHashCalled.Inc(1)
ret := types.BadCommitNumber
- if err := g.preparedStatements[getCommitNumberFromGitHash].QueryRowContext(ctx, githash).Scan(&ret); err != nil {
+ if err := g.db.QueryRow(ctx, statements[getCommitNumberFromGitHash], githash).Scan(&ret); err != nil {
return ret, skerr.Wrapf(err, "Failed get for hash: %q", githash)
}
return ret, nil
@@ -445,7 +430,7 @@
_, mostRecentCommitNumber, err := g.getMostRecentCommit(ctx)
return mostRecentCommitNumber, err
}
- if err := g.preparedStatements[getCommitNumberFromTime].QueryRowContext(ctx, t.Unix()).Scan(&ret); err != nil {
+ if err := g.db.QueryRow(ctx, statements[getCommitNumberFromTime], t.Unix()).Scan(&ret); err != nil {
return ret, skerr.Wrapf(err, "Failed get for time: %q", t)
}
return ret, nil
@@ -455,7 +440,7 @@
// [begin, end), i.e inclusive of begin and exclusive of end.
func (g *Git) CommitSliceFromTimeRange(ctx context.Context, begin, end time.Time) ([]Commit, error) {
g.commitSliceFromTimeRangeCalled.Inc(1)
- rows, err := g.preparedStatements[getCommitsFromTimeRange].QueryContext(ctx, begin.Unix(), end.Unix())
+ rows, err := g.db.Query(ctx, statements[getCommitsFromTimeRange], begin.Unix(), end.Unix())
if err != nil {
return nil, skerr.Wrapf(err, "Failed to query for commit slice in range %s-%s", begin, end)
}
@@ -474,7 +459,7 @@
// [begin, end], i.e inclusive of both begin and end.
func (g *Git) CommitSliceFromCommitNumberRange(ctx context.Context, begin, end types.CommitNumber) ([]Commit, error) {
g.commitSliceFromCommitNumberRangeCalled.Inc(1)
- rows, err := g.preparedStatements[getCommitsFromCommitNumberRange].QueryContext(ctx, begin, end)
+ rows, err := g.db.Query(ctx, statements[getCommitsFromCommitNumberRange], begin, end)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to query for commit slice in range %v-%v", begin, end)
}
@@ -493,7 +478,7 @@
func (g *Git) CommitFromCommitNumber(ctx context.Context, commitNumber types.CommitNumber) (Commit, error) {
g.commitFromCommitNumberCalled.Inc(1)
var c Commit
- if err := g.preparedStatements[getCommitFromCommitNumber].QueryRowContext(ctx, commitNumber).Scan(&c.CommitNumber, &c.GitHash, &c.Timestamp, &c.Author, &c.Subject); err != nil {
+ if err := g.db.QueryRow(ctx, statements[getCommitFromCommitNumber], commitNumber).Scan(&c.CommitNumber, &c.GitHash, &c.Timestamp, &c.Author, &c.Subject); err != nil {
return Commit{}, skerr.Wrapf(err, "Failed to read row at %v", commitNumber)
}
return c, nil
@@ -503,7 +488,7 @@
func (g *Git) GitHashFromCommitNumber(ctx context.Context, commitNumber types.CommitNumber) (string, error) {
g.gitHashFromCommitNumberCalled.Inc(1)
var ret string
- if err := g.preparedStatements[getHashFromCommitNumber].QueryRowContext(ctx, commitNumber).Scan(&ret); err != nil {
+ if err := g.db.QueryRow(ctx, statements[getHashFromCommitNumber], commitNumber).Scan(&ret); err != nil {
return "", skerr.Wrapf(err, "Failed to find git hash for commit number: %v", commitNumber)
}
return ret, nil
diff --git a/perf/go/git/git_test.go b/perf/go/git/git_test.go
index 007b7a3..cdf2693 100644
--- a/perf/go/git/git_test.go
+++ b/perf/go/git/git_test.go
@@ -24,7 +24,7 @@
for name, subTest := range subTests {
t.Run(name, func(t *testing.T) {
- ctx, db, gb, hashes, dialect, instanceConfig, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, gb, hashes, dialect, instanceConfig, _, cleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
g, err := New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/git/gittest/gittest.go b/perf/go/git/gittest/gittest.go
index 4555abe..57a652f 100644
--- a/perf/go/git/gittest/gittest.go
+++ b/perf/go/git/gittest/gittest.go
@@ -3,13 +3,15 @@
import (
"context"
- "database/sql"
+ "fmt"
"io/ioutil"
+ "math/rand"
"os"
"path/filepath"
"testing"
"time"
+ "github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/git/testutils"
@@ -37,7 +39,7 @@
// The repo is populated with 8 commits, one minute apart, starting at StartTime.
//
// The hashes for each commit are going to be random and so are returned also.
-func NewForTest(t *testing.T, dialect perfsql.Dialect) (context.Context, *sql.DB, *testutils.GitBuilder, []string, perfsql.Dialect, *config.InstanceConfig, CleanupFunc) {
+func NewForTest(t *testing.T, dialect perfsql.Dialect) (context.Context, *pgxpool.Pool, *testutils.GitBuilder, []string, perfsql.Dialect, *config.InstanceConfig, string, CleanupFunc) {
ctx, cancel := context.WithCancel(context.Background())
// Create a git repo for testing purposes.
@@ -52,8 +54,9 @@
hashes = append(hashes, gb.CommitGenAt(ctx, "bar.txt", StartTime.Add(6*time.Minute)))
hashes = append(hashes, gb.CommitGenAt(ctx, "foo.txt", StartTime.Add(7*time.Minute)))
+ dbName := fmt.Sprintf("git%d", rand.Uint64())
// Init our sql database.
- db, sqlCleanup := sqltest.NewCockroachDBForTests(t, CockroachDatabaseName, sqltest.ApplyMigrations)
+ db, sqlCleanup := sqltest.NewCockroachDBForTests(t, dbName, sqltest.ApplyMigrations)
// Get tmp dir to use for repo checkout.
tmpDir, err := ioutil.TempDir("", "git")
@@ -74,5 +77,5 @@
Dir: filepath.Join(tmpDir, "checkout"),
},
}
- return ctx, db, gb, hashes, dialect, instanceConfig, clean
+ return ctx, db, gb, hashes, dialect, instanceConfig, dbName, clean
}
diff --git a/perf/go/ingest/process/process.go b/perf/go/ingest/process/process.go
index 0085a5d..ec5f3c6 100644
--- a/perf/go/ingest/process/process.go
+++ b/perf/go/ingest/process/process.go
@@ -3,6 +3,7 @@
import (
"context"
+ "sync"
"time"
"cloud.google.com/go/pubsub"
@@ -12,16 +13,18 @@
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
+
"go.skia.org/infra/perf/go/builders"
"go.skia.org/infra/perf/go/config"
+ "go.skia.org/infra/perf/go/file"
+ "go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/ingest/parser"
"go.skia.org/infra/perf/go/ingestevents"
+ "go.skia.org/infra/perf/go/tracestore"
"google.golang.org/api/option"
)
-const (
- gitRefreshDuration = time.Minute
-)
+const writeRetries = 10
// sendPubSubEvent sends the unencoded params and paramset found in a single
// ingested file to the PubSub topic specified in the selected Perf instances
@@ -55,12 +58,8 @@
return skerr.Wrap(err)
}
-// Start a single go routine to process incoming ingestion files and write
-// the data they contain to a trace store.
-//
-// Except for file.Sources of type "dir" this function should never return
-// except on error.
-func Start(ctx context.Context, local bool, instanceConfig *config.InstanceConfig) error {
+// worker ingests files that arrive on the given 'ch' channel.
+func worker(ctx context.Context, wg *sync.WaitGroup, g *git.Git, store tracestore.TraceStore, ch <-chan file.File, pubSubClient *pubsub.Client, instanceConfig *config.InstanceConfig) {
// Metrics.
filesReceived := metrics2.GetCounter("perfserver_ingest_files_received")
failedToParse := metrics2.GetCounter("perfserver_ingest_failed_to_parse")
@@ -70,47 +69,13 @@
successfulWrite := metrics2.GetCounter("perfserver_ingest_successful_write")
successfulWriteCount := metrics2.GetCounter("perfserver_ingest_num_points_written")
- var pubSubClient *pubsub.Client
- if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
- ts, err := auth.NewDefaultTokenSource(local, pubsub.ScopePubSub)
- if err != nil {
- sklog.Fatalf("Failed to create TokenSource: %s", err)
- }
-
- pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
- if err != nil {
- sklog.Fatal(err)
- }
- }
-
- // New file.Source.
- source, err := builders.NewSourceFromConfig(ctx, instanceConfig, local)
- ch, err := source.Start(ctx)
- if err != nil {
- return skerr.Wrap(err)
- }
-
// New Parser.
p := parser.New(instanceConfig)
- // New TraceStore.
- store, err := builders.NewTraceStoreFromConfig(ctx, local, instanceConfig)
- if err != nil {
- return skerr.Wrap(err)
- }
-
- // New perfgit.Git.
- sklog.Infof("Cloning repo %q into %q", instanceConfig.GitRepoConfig.URL, instanceConfig.GitRepoConfig.Dir)
- g, err := builders.NewPerfGitFromConfig(ctx, local, instanceConfig)
- if err != nil {
- return skerr.Wrap(err)
- }
- g.StartBackgroundPolling(ctx, gitRefreshDuration)
-
- sklog.Info("Waiting on files to process.")
for f := range ch {
if err := ctx.Err(); err != nil {
- return skerr.Wrap(err)
+ sklog.Error(err)
+ break
}
sklog.Infof("Ingest received: %v", f)
filesReceived.Inc(1)
@@ -152,10 +117,20 @@
}
sklog.Info("WriteTraces")
- // Write data to the trace store.
- if err := store.WriteTraces(commitNumber, params, values, ps, f.Name, time.Now()); err != nil {
- failedToWrite.Inc(1)
- sklog.Error("Failed to write %v: %s", f, err)
+ const retries = writeRetries
+ i := 0
+ for {
+ // Write data to the trace store.
+ err := store.WriteTraces(commitNumber, params, values, ps, f.Name, time.Now())
+ if err == nil {
+ break
+ }
+ i++
+ if i > retries {
+ failedToWrite.Inc(1)
+ sklog.Errorf("Failed to write after %d retries %q: %s", retries, f.Name, err)
+ continue
+ }
}
successfulWrite.Inc(1)
successfulWriteCount.Inc(int64(len(params)))
@@ -166,6 +141,61 @@
sklog.Info("FileIngestionTopicName pubsub message sent.")
}
}
+ wg.Done()
+}
+
+// Start a single go routine to process incoming ingestion files and write
+// the data they contain to a trace store.
+//
+// Except for file.Sources of type "dir" this function should never return
+// except on error.
+func Start(ctx context.Context, local bool, numParallelIngesters int, instanceConfig *config.InstanceConfig) error {
+
+ var pubSubClient *pubsub.Client
+ if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
+ ts, err := auth.NewDefaultTokenSource(local, pubsub.ScopePubSub)
+ if err != nil {
+ sklog.Fatalf("Failed to create TokenSource: %s", err)
+ }
+
+ pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ }
+
+ // New file.Source.
+ source, err := builders.NewSourceFromConfig(ctx, instanceConfig, local)
+ ch, err := source.Start(ctx)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+
+ // New TraceStore.
+ store, err := builders.NewTraceStoreFromConfig(ctx, local, instanceConfig)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+
+ // New perfgit.Git.
+ sklog.Infof("Cloning repo %q into %q", instanceConfig.GitRepoConfig.URL, instanceConfig.GitRepoConfig.Dir)
+ g, err := builders.NewPerfGitFromConfig(ctx, local, instanceConfig)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+ // Polling isn't needed because we call update on the repo if we find a git hash we don't recognize.
+ // g.StartBackgroundPolling(ctx, gitRefreshDuration)
+
+ sklog.Info("Waiting on files to process.")
+
+ var wg sync.WaitGroup
+
+ for i := 0; i < numParallelIngesters; i++ {
+ wg.Add(1)
+ go worker(ctx, &wg, g, store, ch, pubSubClient, instanceConfig)
+ }
+ wg.Wait()
+
sklog.Infof("Exited while waiting on files. Should only happen on source_type=dir.")
return nil
}
diff --git a/perf/go/ingest/process/process_test.go b/perf/go/ingest/process/process_test.go
index 07c9144..545ff0f 100644
--- a/perf/go/ingest/process/process_test.go
+++ b/perf/go/ingest/process/process_test.go
@@ -87,7 +87,7 @@
},
}
- err = Start(context.Background(), true, &instanceConfig)
+ err = Start(context.Background(), true, 1, &instanceConfig)
require.NoError(t, err)
// The integration data set has 9 good files, 1 file with a bad commit, and
// 1 malformed JSON file.
diff --git a/perf/go/perfserver/cmd/ingest.go b/perf/go/perfserver/cmd/ingest.go
index 66929d4..1149f4c 100644
--- a/perf/go/perfserver/cmd/ingest.go
+++ b/perf/go/perfserver/cmd/ingest.go
@@ -43,7 +43,7 @@
sklog.Infof("Flags: --%s=%v", f.Name, f.Value)
})
- return process.Start(context.Background(), ingestFlags.Local, instanceConfig)
+ return process.Start(context.Background(), ingestFlags.Local, ingestFlags.NumParallelIngesters, instanceConfig)
},
}
diff --git a/perf/go/regression/dfiter_test.go b/perf/go/regression/dfiter_test.go
index 2fa2e9a..4802152 100644
--- a/perf/go/regression/dfiter_test.go
+++ b/perf/go/regression/dfiter_test.go
@@ -56,7 +56,7 @@
Shards: 8,
}
- store, err := sqltracestore.New(db, perfsql.CockroachDBDialect, cfg)
+ store, err := sqltracestore.New(db, cfg)
require.NoError(t, err)
// Add some points to the first and second tile.
@@ -79,7 +79,7 @@
}, "gs://foo.json", time.Now()) // Time is irrelevent.
assert.NoError(t, err)
- ctx, db, _, _, dialect, instanceConfig, gitCleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
+ ctx, db, _, _, dialect, instanceConfig, _, gitCleanup := gittest.NewForTest(t, perfsql.CockroachDBDialect)
instanceConfig.DataStoreConfig.TileSize = testTileSize
g, err := perfgit.New(ctx, true, db, dialect, instanceConfig)
require.NoError(t, err)
diff --git a/perf/go/regression/sqlregressionstore/sqlregressionstore.go b/perf/go/regression/sqlregressionstore/sqlregressionstore.go
index 48be0fa..1820686 100644
--- a/perf/go/regression/sqlregressionstore/sqlregressionstore.go
+++ b/perf/go/regression/sqlregressionstore/sqlregressionstore.go
@@ -6,9 +6,9 @@
import (
"context"
- "database/sql"
"encoding/json"
+ "github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/alerts"
@@ -16,7 +16,6 @@
"go.skia.org/infra/perf/go/clustering2"
"go.skia.org/infra/perf/go/dataframe"
"go.skia.org/infra/perf/go/regression"
- perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/types"
)
@@ -30,19 +29,15 @@
readRange
)
-// statements allows looking up raw SQL statements by their statement id.
-type statements map[statement]string
-
// statementsByDialect holds all the raw SQL statemens used per Dialect of SQL.
-var statementsByDialect = map[perfsql.Dialect]statements{
- perfsql.CockroachDBDialect: {
- write: `
+var statements = map[statement]string{
+ write: `
UPSERT INTO
Regressions (commit_number, alert_id, regression)
VALUES
($1, $2, $3)
`,
- read: `
+ read: `
SELECT
regression
FROM
@@ -50,7 +45,7 @@
WHERE
commit_number=$1 AND
alert_id=$2`,
- readRange: `
+ readRange: `
SELECT
commit_number, alert_id, regression
FROM
@@ -59,42 +54,28 @@
commit_number >= $1
AND commit_number <= $2
`,
- },
}
// SQLRegressionStore implements the regression.Store interface.
type SQLRegressionStore struct {
// db is the underlying database.
- db *sql.DB
-
- // preparedStatements are all the prepared SQL statements.
- preparedStatements map[statement]*sql.Stmt
+ db *pgxpool.Pool
}
// New returns a new *SQLRegressionStore.
//
// We presume all migrations have been run against db before this function is
// called.
-func New(db *sql.DB, dialect perfsql.Dialect) (*SQLRegressionStore, error) {
- preparedStatements := map[statement]*sql.Stmt{}
- for key, statement := range statementsByDialect[dialect] {
- prepared, err := db.Prepare(statement)
- if err != nil {
- return nil, skerr.Wrapf(err, "Failed to prepare statment %v %q", key, statement)
- }
- preparedStatements[key] = prepared
- }
-
+func New(db *pgxpool.Pool) (*SQLRegressionStore, error) {
return &SQLRegressionStore{
- db: db,
- preparedStatements: preparedStatements,
+ db: db,
}, nil
}
// Range implements the regression.Store interface.
func (s *SQLRegressionStore) Range(ctx context.Context, begin, end types.CommitNumber) (map[types.CommitNumber]*regression.AllRegressionsForCommit, error) {
ret := map[types.CommitNumber]*regression.AllRegressionsForCommit{}
- rows, err := s.preparedStatements[readRange].QueryContext(ctx, begin, end)
+ rows, err := s.db.Query(ctx, statements[readRange], begin, end)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to read regressions in range: %d %d", begin, end)
}
@@ -190,7 +171,7 @@
if err != nil {
return skerr.Wrapf(err, "Failed to serialize regression for alertID: %d commitNumber=%d", alertID, commitNumber)
}
- if _, err := s.preparedStatements[write].ExecContext(ctx, commitNumber, alertID, string(b)); err != nil {
+ if _, err := s.db.Exec(ctx, statements[write], commitNumber, alertID, string(b)); err != nil {
return skerr.Wrapf(err, "Failed to write regression for alertID: %d commitNumber=%d", alertID, commitNumber)
}
return nil
@@ -204,7 +185,7 @@
return nil, skerr.Fmt("Failed to convert alertIDString %q to an int.", alertIDString)
}
var jsonString string
- if err := s.preparedStatements[read].QueryRowContext(ctx, commitNumber, alertID).Scan(&jsonString); err != nil {
+ if err := s.db.QueryRow(ctx, statements[read], commitNumber, alertID).Scan(&jsonString); err != nil {
return nil, skerr.Wrapf(err, "Failed to read regression for alertID: %d commitNumber=%d", alertID, commitNumber)
}
r := regression.NewRegression()
@@ -228,7 +209,7 @@
}
// Do everything in a transaction so we don't have any lost updates.
- tx, err := s.db.BeginTx(ctx, nil)
+ tx, err := s.db.Begin(ctx)
if err != nil {
return skerr.Wrapf(err, "Can't start transaction")
}
@@ -238,13 +219,13 @@
// Read the regression from the database. If any part of that fails then
// just use the default regression we've already constructed.
var jsonString string
- if err := tx.StmtContext(ctx, s.preparedStatements[read]).QueryRowContext(ctx, commitNumber, alertID).Scan(&jsonString); err == nil {
+ if err := tx.QueryRow(ctx, statements[read], commitNumber, alertID).Scan(&jsonString); err == nil {
if err := json.Unmarshal([]byte(jsonString), r); err != nil {
sklog.Warningf("Failed to deserialize the JSON Regression: %s", err)
}
} else {
if mustExist {
- if err := tx.Rollback(); err != nil {
+ if err := tx.Rollback(ctx); err != nil {
sklog.Errorf("Failed on rollback: %s", err)
}
return skerr.Wrapf(err, "Regression doesn't exist.")
@@ -255,19 +236,19 @@
b, err := json.Marshal(r)
if err != nil {
- if err := tx.Rollback(); err != nil {
+ if err := tx.Rollback(ctx); err != nil {
sklog.Errorf("Failed on rollback: %s", err)
}
return skerr.Wrapf(err, "Failed to serialize regression for alertID: %d commitNumber=%d", alertID, commitNumber)
}
- if _, err := tx.StmtContext(ctx, s.preparedStatements[write]).ExecContext(ctx, commitNumber, alertID, string(b)); err != nil {
- if err := tx.Rollback(); err != nil {
+ if _, err := tx.Exec(ctx, statements[write], commitNumber, alertID, string(b)); err != nil {
+ if err := tx.Rollback(ctx); err != nil {
sklog.Errorf("Failed on rollback: %s", err)
}
return skerr.Wrapf(err, "Failed to write regression for alertID: %d commitNumber=%d", alertID, commitNumber)
}
- return tx.Commit()
+ return tx.Commit(ctx)
}
// Confirm that SQLRegressionStore implements regression.Store.
diff --git a/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go b/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
index 73b4f6d..1246441 100644
--- a/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
+++ b/perf/go/regression/sqlregressionstore/sqlregressionstore_test.go
@@ -9,7 +9,6 @@
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/perf/go/regression"
"go.skia.org/infra/perf/go/regression/regressiontest"
- perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/sql/sqltest"
)
@@ -24,7 +23,7 @@
// actual errors.
defer cleanup()
- store, err := New(db, perfsql.CockroachDBDialect)
+ store, err := New(db)
require.NoError(t, err)
subTest(t, store)
})
@@ -38,7 +37,7 @@
// actual errors.
defer cleanup()
- store, err := New(db, perfsql.CockroachDBDialect)
+ store, err := New(db)
require.NoError(t, err)
subTest(t, store)
})
diff --git a/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore.go b/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore.go
index 333e428..7b536a3 100644
--- a/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore.go
+++ b/perf/go/shortcut/sqlshortcutstore/sqlshortcutstore.go
@@ -5,10 +5,10 @@
import (
"context"
- "database/sql"
"encoding/json"
"io"
+ "github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
@@ -26,20 +26,16 @@
getAllShortcuts
)
-// statements allows looking up raw SQL statements by their statement id.
-type statements map[statement]string
-
-// statementsByDialect holds all the raw SQL statemens used per Dialect of SQL.
-var statementsByDialect = map[perfsql.Dialect]statements{
- perfsql.CockroachDBDialect: {
- insertShortcut: `
+// statements holds all the raw SQL statemens.
+var statements = map[statement]string{
+ insertShortcut: `
INSERT INTO
Shortcuts (id, trace_ids)
VALUES
($1, $2)
ON CONFLICT
DO NOTHING`,
- getShortcut: `
+ getShortcut: `
SELECT
(trace_ids)
FROM
@@ -47,38 +43,27 @@
WHERE
id=$1
`,
- getAllShortcuts: `
+ getAllShortcuts: `
SELECT
(trace_ids)
FROM
Shortcuts
`,
- },
}
// SQLShortcutStore implements the shortcut.Store interface using an SQL
// database.
type SQLShortcutStore struct {
- // preparedStatements are all the prepared SQL statements.
- preparedStatements map[statement]*sql.Stmt
+ db *pgxpool.Pool
}
// New returns a new *SQLShortcutStore.
//
// We presume all migrations have been run against db before this function is
// called.
-func New(db *sql.DB, dialect perfsql.Dialect) (*SQLShortcutStore, error) {
- preparedStatements := map[statement]*sql.Stmt{}
- for key, statement := range statementsByDialect[dialect] {
- prepared, err := db.Prepare(statement)
- if err != nil {
- return nil, skerr.Wrapf(err, "Failed to prepare statment %v %q", key, statement)
- }
- preparedStatements[key] = prepared
- }
-
+func New(db *pgxpool.Pool, dialect perfsql.Dialect) (*SQLShortcutStore, error) {
return &SQLShortcutStore{
- preparedStatements: preparedStatements,
+ db: db,
}, nil
}
@@ -103,7 +88,7 @@
if err != nil {
return "", err
}
- if _, err := s.preparedStatements[insertShortcut].ExecContext(ctx, id, string(b)); err != nil {
+ if _, err := s.db.Exec(ctx, statements[insertShortcut], id, string(b)); err != nil {
return "", skerr.Wrap(err)
}
return id, nil
@@ -112,7 +97,7 @@
// Get implements the shortcut.Store interface.
func (s *SQLShortcutStore) Get(ctx context.Context, id string) (*shortcut.Shortcut, error) {
var encoded string
- if err := s.preparedStatements[getShortcut].QueryRowContext(ctx, id).Scan(&encoded); err != nil {
+ if err := s.db.QueryRow(ctx, statements[getShortcut], id).Scan(&encoded); err != nil {
return nil, skerr.Wrapf(err, "Failed to load shortcuts.")
}
var sc shortcut.Shortcut
@@ -127,7 +112,7 @@
func (s *SQLShortcutStore) GetAll(ctx context.Context) (<-chan *shortcut.Shortcut, error) {
ret := make(chan *shortcut.Shortcut)
- rows, err := s.preparedStatements[getAllShortcuts].QueryContext(ctx)
+ rows, err := s.db.Query(ctx, statements[getAllShortcuts])
if err != nil {
return ret, skerr.Wrapf(err, "Failed to query for all shortcuts.")
}
@@ -151,5 +136,4 @@
}()
return ret, nil
-
}
diff --git a/perf/go/sql/sqltest/sqltest.go b/perf/go/sql/sqltest/sqltest.go
index 50602ba..46db2a5 100644
--- a/perf/go/sql/sqltest/sqltest.go
+++ b/perf/go/sql/sqltest/sqltest.go
@@ -1,10 +1,12 @@
package sqltest
import (
+ "context"
"database/sql"
"fmt"
"testing"
+ "github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
perfsql "go.skia.org/infra/perf/go/sql"
@@ -37,7 +39,7 @@
// test.
//
// If migrations to are be applied then set applyMigrations to true.
-func NewCockroachDBForTests(t *testing.T, databaseName string, applyMigrations ApplyMigrationsOption) (*sql.DB, Cleanup) {
+func NewCockroachDBForTests(t *testing.T, databaseName string, applyMigrations ApplyMigrationsOption) (*pgxpool.Pool, Cleanup) {
// Note that the migrationsConnection is different from the sql.Open
// connection string since migrations know about CockroachDB, but we use the
// Postgres driver for the database/sql connection since there's no native
@@ -63,12 +65,17 @@
require.NoError(t, err)
}
+ ctx := context.Background()
+ conn, err := pgxpool.Connect(ctx, connectionString)
+ require.NoError(t, err)
+
cleanup := func() {
// Don't bother applying migrations.Down since we aren't testing
// migrations here and it just slows down the tests.
_, err = db.Exec(fmt.Sprintf("DROP DATABASE %s CASCADE;", databaseName))
assert.NoError(t, err)
+ require.NoError(t, db.Close())
+ conn.Close()
}
-
- return db, cleanup
+ return conn, cleanup
}
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore.go b/perf/go/tracestore/sqltracestore/sqltracestore.go
index b7b3325..39452c9 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore.go
@@ -5,119 +5,89 @@
traces into an SQL database.
Each trace name, which is a structured key (See /infra/go/query) of the form
-,key1=value1,key2=value2,..., is stored in the TraceIds table so we can use the
-much shorter 64 bit trace_id in other tables.
+,key1=value1,key2=value2,..., is stored in the TraceNames table so we can use the
+much shorter 128 bit md5 hash in trace_id in other tables. The value of the
+trace name is parsed into a paramtools.Params and stored in the 'params' column
+with an inverted index, which enables all the queries that Perf supports.
- TraceIDs (
- trace_id INTEGER PRIMARY KEY,
- trace_name TEXT UNIQUE NOT NULL
- )
+ CREATE TABLE IF NOT EXISTS TraceNames (
+ -- md5(trace_name)
+ trace_id BYTES PRIMARY KEY,
+ -- The params that make up the trace_id, {"arch=x86", "config=8888"}.
+ params JSONB NOT NULL,
+ INVERTED INDEX (params)
+ );
Similarly we store the name of every source file that has been ingested in the
SourceFiles table so we can use the shorter 64 bit source_file_id in other
tables.
- SourceFiles (
- source_file_id INTEGER PRIMARY KEY,
- source_file TEXT UNIQUE NOT NULL
- )
+ SourceFiles (
+ source_file_id INTEGER PRIMARY KEY,
+ source_file TEXT UNIQUE NOT NULL
+ )
+ CREATE TABLE IF NOT EXISTS SourceFiles (
+ source_file_id INT PRIMARY KEY DEFAULT unique_rowid(),
+ source_file STRING UNIQUE NOT NULL
+ );
-We store the values of each trace in the TraceValues table, and use the trace_id
+We store the values of each trace in the TraceValues2 table, and use the trace_id
and the commit_number as the primary key. We also store not only the value but
the id of the source file that the value came from.
- TraceValues (
- trace_id INTEGER,
- commit_number INTEGER,
- val REAL,
- source_file_id INTEGER,
- PRIMARY KEY (trace_id, commit_number)
- )
+ CREATE TABLE IF NOT EXISTS TraceValues2 (
+ -- md5(trace_name) from TraceNames.
+ trace_id BYTES,
+ -- A types.CommitNumber.
+ commit_number INT,
+ -- The floating point measurement.
+ val REAL,
+ -- Id of the source filename, from SourceFiles.
+ source_file_id INT,
+ PRIMARY KEY (trace_id, commit_number)
+ );
Just using this table we can construct some useful queries. For example
we can count the number of traces in a single tile, in this case the
0th tile in a system with a tileSize of 256:
- SELECT
- COUNT(DISTINCT trace_id)
- FROM
- TraceValues
- WHERE
- commit_number >= 0 AND commit_number < 256;
+ SELECT
+ COUNT(DISTINCT trace_id)
+ FROM
+ TraceValues2
+ WHERE
+ commit_number >= 0 AND commit_number < 256;
-The Postings table is our inverted index for looking up which trace ids
-contain which key=value pairs. For a good introduction to postings and search
-https://www.tbray.org/ongoing/When/200x/2003/06/18/HowSearchWorks is a good
-resource.
+The JSONB serialized Params in the TraceNames table allows
+building ParamSets for a range of commits:
-Remember that each trace name is a structured key of the form
-,arch=x86,config=8888,..., and that over time traces may come and go, i.e. we
-may stop running a test, or start running new tests, so if we want to make
-searching for traces efficient we need to be aware of how those trace ids change
-over time. The answer is to break our store in Tiles, i.e. blocks of commits of
-tileSize length, and then for each Tile we keep an inverted index of the trace
-ids. This allows us to not only construct fast queries, but to also do things
-like build ParamSets, a collection of all the keys and all their values ever
-seen for a particular Tile.
+ SELECT
+ DISTINCT TraceNames.params
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
+ WHERE
+ TraceValues2.commit_number >= 0
+ AND TraceValues2.commit_number < 512;
-In the table below we store a key_value which is the literal "key=value" part of
-a trace name, along with the tile_number and the 64 bit trace id. Note that
-tile_number is just int(commitNumber/tileSize).
-
- Postings (
- tile_number INTEGER,
- key_value text NOT NULL,
- trace_id INTEGER,
- PRIMARY KEY (tile_number, key_value, trace_id)
- )
-
-So for example to build a ParamSet from Postings:
-
- SELECT DISTINCT
- key_value
- FROM
- Postings
- WHERE
- tile_number=0;
-
-To find the most recent tile:
-
- SELECT
- tile_number
- FROM
- Postings
- ORDER BY
- tile_number DESC LIMIT 1;
And finally, to retrieve all the trace values that
-would match a query, we first start with sub-queries for
-each of the common keys in the query, which produce the
-trace_ids, which are then ANDed across all the distinct
-keys in the query. Finally that list is inner joined to the
-TraceValues table to load up all the values.
+would match a query:
- SELECT
- TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
- FROM
- TraceIDs
- INNER JOIN
- TraceValues
- ON
- TraceValues.trace_id = TraceIDs.trace_id
- WHERE
- TraceValues.trace_id IN (
- SELECT trace_id FROM Postings
- WHERE key_value IN ("arch=x86", "arch=arm")
- AND tile_number=0
- )
- AND
- TraceValues.trace_id IN (
- SELECT trace_id FROM Postings
- WHERE key_value IN ("config=8888")
- AND tile_number=0
- );
+ SELECT
+ TraceNames.params,
+ TraceValues2.commit_number,
+ TraceValues2.val
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ WHERE
+ TraceNames.params ->> 'arch' IN ('x86')
+ AND TraceNames.params ->> 'config' IN ('565', '8888')
+ AND TraceValues2.commit_number >= 0
+ AND TraceValues2.commit_number < 255;
-Look in migrations/test.sql for more example of raw queries using
+Look in migrations/cdb.sql for more example of raw queries using
a simple example dataset.
*/
package sqltracestore
@@ -126,15 +96,14 @@
"bytes"
"context"
"crypto/md5"
- "database/sql"
- "encoding/hex"
+ "encoding/json"
"fmt"
"sort"
- "strconv"
- "strings"
"text/template"
"time"
+ "github.com/jackc/pgx/v4"
+ "github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
@@ -145,27 +114,31 @@
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/cache"
"go.skia.org/infra/perf/go/cache/local"
- "go.skia.org/infra/perf/go/cache/memcached"
"go.skia.org/infra/perf/go/config"
- perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/sqltracestore/engine"
"go.skia.org/infra/perf/go/types"
)
+const writeTracesChunkSize = 200
+
// defaultCacheSize is the size of the in-memory LRU cache if no size was
// specified in the config file.
const defaultCacheSize = 20 * 1000 * 1000
-// Ingest instances have around 8 cores and many ingested files have ~10K
-// values, so pick a batch size that allows roughly one request per core.
-const traceValuesInsertBatchSize = 1000
+// traceIDForSQL is the type of the IDs that are used in the SQL queries,
+// they are hex encoded md5 hashes of a trace name, e.g. "\x00112233...".
+// Note the \x prefix which tells CockroachDB that this is hex encoded.
+type traceIDForSQL string
-// traceIDFromSQL is the type of the IDs that are used in the SQL database for
-// traces.
-type traceIDFromSQL int64
+var badTraceIDFromSQL traceIDForSQL = ""
-const badTraceIDFromSQL traceIDFromSQL = -1
+// Calculates the traceIDForSQL for the given trace name, e.g. "\x00112233...".
+// Note the \x prefix which tells CockroachDB that this is hex encoded.
+func traceIDForSQLFromTraceName(traceName string) traceIDForSQL {
+ b := md5.Sum([]byte(traceName))
+ return traceIDForSQL(fmt.Sprintf("\\x%x", b))
+}
// sourceFileIDFromSQL is the type of the IDs that are used in the SQL database
// for source files.
@@ -176,189 +149,231 @@
// statement is an SQL statement or fragment of an SQL statement.
type statement int
-// All the different statements we need.
+// All the different statements we need. Each statement will appear either in
+// templatesByDialect or statementsByDialect.
const (
insertIntoSourceFiles statement = iota
getSourceFileID
- insertIntoTraceIDs
+ replaceTraceNames
getTraceID
- insertIntoPostings
replaceTraceValues
- countIndices
- getLatestTile
+ getLatestCommit
paramSetForTile
getSource
traceCount
+ queryTraces
+ queryTracesIDOnly
+ readTraces
)
-type statements map[statement]string
-
-type templates map[statement]string
-
-var templatesByDialect = map[perfsql.Dialect]templates{
- perfsql.CockroachDBDialect: {
- insertIntoPostings: `
- UPSERT INTO
- Postings (tile_number, key_value, trace_id)
- VALUES
- {{ range $index, $element := . -}}
- {{ if $index }},{{end}}({{ $element.TileNumber }}, '{{ $element.KeyValue }}', {{ $element.TraceID }})
- {{ end }}`,
- replaceTraceValues: `
- UPSERT INTO
- TraceValues (trace_id, commit_number, val, source_file_id)
- VALUES
- {{ range $index, $element := . -}}
- {{ if $index }},{{end}}({{ $element.TraceID }}, {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }})
- {{ end }}`,
- },
+var templates = map[statement]string{
+ replaceTraceValues: `INSERT INTO
+ TraceValues2 (trace_id, commit_number, val, source_file_id)
+ VALUES
+ {{ range $index, $element := . -}}
+ {{ if $index }},{{end}}
+ (
+ '{{ $element.MD5HexTraceID }}', {{ $element.CommitNumber }}, {{ $element.Val }}, {{ $element.SourceFileID }}
+ )
+ {{ end }}
+ ON CONFLICT
+ DO NOTHING
+ `,
+ replaceTraceNames: `INSERT INTO
+ TraceNames (trace_id, params)
+ VALUES
+ {{ range $index, $element := . -}}
+ {{ if $index }},{{end}}
+ (
+ '{{ $element.MD5HexTraceID }}', '{{ $element.JSONParams }}'
+ )
+ {{ end }}
+ ON CONFLICT
+ DO NOTHING
+ `,
+ queryTraces: `
+ SELECT
+ TraceNames.params,
+ TraceValues2.commit_number,
+ TraceValues2.val
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ WHERE
+ TraceValues2.commit_number >= {{ .BeginCommitNumber }}
+ AND TraceValues2.commit_number < {{ .EndCommitNumber }}
+ {{ range $key, $values := .QueryPlan }}
+ AND TraceNames.params ->> '{{ $key }}' IN
+ (
+ {{ range $index, $value := $values -}}
+ {{ if $index }},{{end}} '{{ $value }}'
+ {{ end }}
+ )
+ {{ end }}
+ `,
+ queryTracesIDOnly: `
+ SELECT
+ TraceNames.params
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ WHERE
+ TraceValues2.commit_number >= {{ .BeginCommitNumber }}
+ AND TraceValues2.commit_number < {{ .EndCommitNumber }}
+ {{ range $key, $values := .QueryPlan }}
+ AND TraceNames.params ->> '{{ $key }}' IN
+ (
+ {{ range $index, $value := $values -}}
+ {{ if $index }},{{end}} '{{ $value }}'
+ {{ end }}
+ )
+ {{ end }}
+ `,
+ readTraces: `
+ SELECT
+ TraceNames.params,
+ TraceValues2.commit_number,
+ TraceValues2.val
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ WHERE
+ TraceValues2.commit_number >= {{ .BeginCommitNumber }}
+ AND TraceValues2.commit_number < {{ .EndCommitNumber }}
+ AND TraceValues2.trace_id IN
+ (
+ {{ range $index, $trace_id := .TraceIDs -}}
+ {{ if $index }},{{end}} '{{ $trace_id }}'
+ {{ end }}
+ )
+ `,
+ getSource: `
+ SELECT
+ SourceFiles.source_file
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
+ INNER JOIN SourceFiles ON SourceFiles.source_file_id = TraceValues2.source_file_id
+ WHERE
+ TraceNames.trace_id = '{{ .MD5HexTraceID }}'
+ AND TraceValues2.commit_number = {{ .CommitNumber }}`,
}
-// insertIntoPostingsValue represents the values used in the insertIntoPostings template.
-type insertIntoPostingsValue struct {
- TileNumber types.TileNumber
- KeyValue string
- TraceID traceIDFromSQL
-}
+// replaceTraceValuesContext is the context for the replaceTraceValues template.
+type replaceTraceValuesContext struct {
+ // The MD5 sum of the trace name as a hex string, i.e.
+ // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which
+ // CockroachDB will use to know the string is in hex.
+ MD5HexTraceID traceIDForSQL
-// replaceTraceValue is used in the replaceTraceValues template.
-type replaceTraceValue struct {
- TraceID traceIDFromSQL
CommitNumber types.CommitNumber
Val float32
SourceFileID sourceFileIDFromSQL
}
-var statementsByDialect = map[perfsql.Dialect]statements{
- perfsql.CockroachDBDialect: {
- insertIntoSourceFiles: `
- INSERT INTO
- SourceFiles (source_file)
- VALUES
- ($1)
- ON CONFLICT
- DO NOTHING`,
- getSourceFileID: `
- SELECT
- source_file_id
- FROM
- SourceFiles
- WHERE
- source_file=$1`,
- insertIntoTraceIDs: `
- INSERT INTO
- TraceIDs (trace_name)
- VALUES
- ($1)
- ON CONFLICT
- DO NOTHING`,
- getTraceID: `
- SELECT
- trace_id
- FROM
- TraceIDs
- WHERE
- trace_name=$1`,
- insertIntoPostings: `
- INSERT INTO
- Postings (tile_number, key_value, trace_id)
- VALUES
- ($1, $2, $3)
- ON CONFLICT
- DO NOTHING`,
- countIndices: `
- SELECT
- COUNT(*)
- FROM
- Postings
- WHERE
- tile_number=$1`,
- getLatestTile: `
- SELECT
- tile_number
- FROM
- Postings
- ORDER BY
- tile_number DESC
- LIMIT 1`,
- paramSetForTile: `
- SELECT DISTINCT
- key_value
- FROM
- Postings
- WHERE
- tile_number=$1`,
- getSource: `
- SELECT
- SourceFiles.source_file
- FROM
- TraceIDs
- INNER JOIN
- TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
- INNER JOIN
- SourceFiles ON SourceFiles.source_file_id = TraceValues.source_file_id
- WHERE
- TraceIDs.trace_name=$1 AND TraceValues.commit_number=$2`,
- traceCount: `
- SELECT
- COUNT(DISTINCT trace_id)
- FROM
- TraceValues
- WHERE
- commit_number >= $1 AND commit_number <= $2`,
- },
+// replaceTraceNamesContext is the context for the replaceTraceNames template.
+type replaceTraceNamesContext struct {
+ // The trace's Params serialize as JSON.
+ JSONParams string
+
+ // The MD5 sum of the trace name as a hex string, i.e.
+ // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which
+ // CockroachDB will use to know the string is in hex.
+ MD5HexTraceID traceIDForSQL
+}
+
+// queryTracesContext is the context for the queryTraces template.
+type queryTracesContext struct {
+ BeginCommitNumber types.CommitNumber
+ EndCommitNumber types.CommitNumber
+ QueryPlan paramtools.ParamSet
+}
+
+// readTracesContext is the context for the readTraces template.
+type readTracesContext struct {
+ BeginCommitNumber types.CommitNumber
+ EndCommitNumber types.CommitNumber
+ TraceIDs []traceIDForSQL
+}
+
+// getSourceContext is the context for the getSource template.
+type getSourceContext struct {
+ CommitNumber types.CommitNumber
+
+ // The MD5 sum of the trace name as a hex string, i.e.
+ // "\xfe385b159ff55dca481069805e5ff050". Note the leading \x which
+ // CockroachDB will use to know the string is in hex.
+ MD5HexTraceID traceIDForSQL
+}
+
+var statements = map[statement]string{
+ insertIntoSourceFiles: `
+ INSERT INTO
+ SourceFiles (source_file)
+ VALUES
+ ($1)
+ ON CONFLICT
+ DO NOTHING`,
+ getSourceFileID: `
+ SELECT
+ source_file_id
+ FROM
+ SourceFiles
+ WHERE
+ source_file=$1`,
+ getLatestCommit: `
+ SELECT
+ commit_number
+ FROM
+ TraceValues2
+ ORDER BY
+ commit_number DESC
+ LIMIT
+ 1;`,
+ paramSetForTile: `
+ SELECT
+ DISTINCT TraceNames.params
+ FROM
+ TraceNames
+ INNER JOIN TraceValues2 ON TraceNames.trace_id = TraceValues2.trace_id
+ WHERE
+ TraceValues2.commit_number >= $1
+ AND TraceValues2.commit_number < $2;`,
+ traceCount: `
+ SELECT
+ COUNT(DISTINCT trace_id)
+ FROM
+ TraceValues2
+ WHERE
+ commit_number >= $1 AND commit_number <= $2`,
}
// SQLTraceStore implements tracestore.TraceStore backed onto an SQL database.
type SQLTraceStore struct {
// db is the SQL database instance.
- db *sql.DB
-
- // preparedStatements are all the prepared SQL statements.
- preparedStatements map[statement]*sql.Stmt
+ db *pgxpool.Pool
// unpreparedStatements are parsed templates that can be used to construct SQL statements.
unpreparedStatements map[statement]*template.Template
- // cache is an LRU cache used to store the ids (traceIDFromSQL) of hashed trace names (string):
- // "md5(traceName)" -> traceIDFromSQL
- // and if a traces postings have been written for a tile (bool).
- // "traceID-tileNumber" -> bool
- //
- // For traceNames the md5 sum is used to keep the cache keys shorter, thus preserving
- // memory and also allowing the use of memcached which restricts keys to 250 chars.
cache cache.Cache
// tileSize is the number of commits per Tile.
tileSize int32
// metrics
- writeTracesMetric metrics2.Float64SummaryMetric
- writeTracesMetricSQL metrics2.Float64SummaryMetric
- updateTraceValuesMetric metrics2.Float64SummaryMetric
- buildTracesContextMetric metrics2.Float64SummaryMetric
- writeTraceIDAndPostingsCacheHitMetric metrics2.Counter
- writeTraceIDAndPostingsCacheMissMetric metrics2.Counter
- writeTraceIDAndPostingsPostingsCacheHitMetric metrics2.Counter
- writeTraceIDAndPostingsPostingsCacheMissMetric metrics2.Counter
+ writeTracesMetric metrics2.Float64SummaryMetric
+ writeTracesMetricSQL metrics2.Float64SummaryMetric
+ buildTracesContextsMetric metrics2.Float64SummaryMetric
}
// New returns a new *SQLTraceStore.
//
// We presume all migrations have been run against db before this function is
// called.
-func New(db *sql.DB, dialect perfsql.Dialect, datastoreConfig config.DataStoreConfig) (*SQLTraceStore, error) {
- preparedStatements := map[statement]*sql.Stmt{}
- for key, statement := range statementsByDialect[dialect] {
- prepared, err := db.Prepare(statement)
- if err != nil {
- return nil, skerr.Wrapf(err, "preparing statement %v, %q", key, statement)
- }
- preparedStatements[key] = prepared
- }
-
+func New(db *pgxpool.Pool, datastoreConfig config.DataStoreConfig) (*SQLTraceStore, error) {
unpreparedStatements := map[statement]*template.Template{}
- for key, tmpl := range templatesByDialect[dialect] {
+ for key, tmpl := range templates {
t, err := template.New("").Parse(tmpl)
if err != nil {
return nil, skerr.Wrapf(err, "parsing template %v, %q", key, tmpl)
@@ -366,44 +381,19 @@
unpreparedStatements[key] = t
}
- var memoryCacheSize = datastoreConfig.Cache.Size
- if memoryCacheSize == 0 {
- memoryCacheSize = defaultCacheSize
- }
-
- var err error
- var cache cache.Cache
- if len(datastoreConfig.Cache.MemcachedServers) > 0 && datastoreConfig.Cache.Namespace != "" {
- cache, err = memcached.New(datastoreConfig.Cache.MemcachedServers, datastoreConfig.Cache.Namespace)
- if err != nil {
- // Fall back to in-memory cache.
- cache, err = local.New(memoryCacheSize)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
-
- }
- } else {
- cache, err = local.New(memoryCacheSize)
- if err != nil {
- return nil, skerr.Wrap(err)
- }
+ cache, err := local.New(defaultCacheSize)
+ if err != nil {
+ return nil, skerr.Wrap(err)
}
return &SQLTraceStore{
- db: db,
- preparedStatements: preparedStatements,
- unpreparedStatements: unpreparedStatements,
- cache: cache,
- tileSize: datastoreConfig.TileSize,
- writeTracesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraces"),
- writeTracesMetricSQL: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTracesSQL"),
- updateTraceValuesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_updateTraceValues"),
- buildTracesContextMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_buildTracesContext"),
- writeTraceIDAndPostingsCacheHitMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_hit"),
- writeTraceIDAndPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_cache_miss"),
- writeTraceIDAndPostingsPostingsCacheHitMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_hit"),
- writeTraceIDAndPostingsPostingsCacheMissMetric: metrics2.GetCounter("perfserver_sqltracestore_writeTraceIDAndPostings_postings_cache_miss"),
+ db: db,
+ unpreparedStatements: unpreparedStatements,
+ tileSize: datastoreConfig.TileSize,
+ cache: cache,
+ writeTracesMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTraces"),
+ writeTracesMetricSQL: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_writeTracesSQL"),
+ buildTracesContextsMetric: metrics2.GetFloat64SummaryMetric("perfserver_sqltracestore_buildTracesContext"),
}, nil
}
@@ -416,41 +406,47 @@
// CountIndices implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) CountIndices(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
- var ret int64
- if err := s.preparedStatements[countIndices].QueryRowContext(ctx, tileNumber).Scan(&ret); err != nil {
- return 0, skerr.Wrap(err)
- }
- return ret, nil
+
+ // This doesn't make any sense for the SQL implementation of TraceStore.
+ return 0, nil
}
// GetLatestTile implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetLatestTile() (types.TileNumber, error) {
- tileNumber := types.BadTileNumber
- if err := s.preparedStatements[getLatestTile].QueryRowContext(context.TODO()).Scan(&tileNumber); err != nil {
- return tileNumber, skerr.Wrap(err)
+ mostRecentCommit := types.BadCommitNumber
+ if err := s.db.QueryRow(context.TODO(), statements[getLatestCommit]).Scan(&mostRecentCommit); err != nil {
+ return types.BadTileNumber, skerr.Wrap(err)
}
- return tileNumber, nil
+ return types.TileNumberFromCommitNumber(mostRecentCommit, s.tileSize), nil
}
func (s *SQLTraceStore) paramSetForTile(tileNumber types.TileNumber) (paramtools.ParamSet, error) {
- rows, err := s.preparedStatements[paramSetForTile].QueryContext(context.TODO(), tileNumber)
+ // Convert the tile number into a range of commits, since we don't store data by
+ // tile anymore.
+ beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
+
+ rows, err := s.db.Query(context.TODO(), statements[paramSetForTile], beginCommit, endCommit)
if err != nil {
- return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
+ fmt.Printf("%q %d %d", statements[paramSetForTile], beginCommit, endCommit)
+ return nil, skerr.Wrapf(err, "Failed querying - tileNumber=%d", tileNumber)
}
ret := paramtools.NewParamSet()
for rows.Next() {
- var keyValue string
- if err := rows.Scan(&keyValue); err != nil {
- return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
+ var jsonParams string
+ if err := rows.Scan(&jsonParams); err != nil {
+ return nil, skerr.Wrapf(err, "Failed scanning row - tileNumber=%d", tileNumber)
}
- parts := strings.Split(keyValue, "=")
- if len(parts) != 2 {
- return nil, skerr.Fmt("Invalid key=value form: %q", keyValue)
+ var ps paramtools.Params
+ if err := json.Unmarshal([]byte(jsonParams), &ps); err != nil {
+ return nil, skerr.Wrapf(err, "Failed unmarshal - tileNumber=%d", tileNumber)
}
- ret.AddParams(paramtools.Params{parts[0]: parts[1]})
+ ret.AddParams(ps)
+ }
+ if err == pgx.ErrNoRows {
+ return ret, nil
}
if err := rows.Err(); err != nil {
- return nil, skerr.Wrapf(err, "tileNumer=%d", tileNumber)
+ return nil, skerr.Wrapf(err, "Other failure - tileNumber=%d", tileNumber)
}
ret.Normalize()
return ret, nil
@@ -471,8 +467,21 @@
// GetSource implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) GetSource(ctx context.Context, commitNumber types.CommitNumber, traceName string) (string, error) {
var filename string
- if err := s.preparedStatements[getSource].QueryRowContext(ctx, traceName, commitNumber).Scan(&filename); err != nil {
- return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q", commitNumber, traceName)
+ traceID := traceIDForSQLFromTraceName(traceName)
+
+ sourceContext := getSourceContext{
+ MD5HexTraceID: traceID,
+ CommitNumber: commitNumber,
+ }
+
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[getSource].Execute(&b, sourceContext); err != nil {
+ return "", skerr.Wrapf(err, "failed to expand get source template")
+ }
+ sql := b.String()
+
+ if err := s.db.QueryRow(ctx, sql).Scan(&filename); err != nil {
+ return "", skerr.Wrapf(err, "commitNumber=%d traceName=%q traceID=%q", commitNumber, traceName, traceID)
}
return filename, nil
}
@@ -507,64 +516,56 @@
if err := query.ValidateParamSet(plan); err != nil {
return nil, skerr.Wrapf(err, "invalid query %#v", *q)
}
- tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
+
+ // Prepare the template context.
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
-
- stmt := fmt.Sprintf(`
-SELECT
- TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
-FROM
- TraceIDs
-INNER JOIN
- TraceValues
-ON
- TraceValues.trace_id = TraceIDs.trace_id
-WHERE
- TraceValues.commit_number >= %d
- AND TraceValues.commit_number <= %d
- AND `, beginCommit, endCommit)
-
- // TODO(jcgregorio) Break out into own function.
- whereClauses := make([]string, len(plan))
- clauseIndex := 0
- for key, values := range plan {
- inClause := make([]string, len(values))
- for i, value := range values {
- inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
- }
- whereClauses[clauseIndex] = `
- TraceValues.trace_id IN (
- SELECT trace_id FROM Postings WHERE key_value IN (` +
- strings.Join(inClause, ",") +
- `) AND tile_number=` + tileNumberString + `
- )`
- clauseIndex++
+ context := queryTracesContext{
+ BeginCommitNumber: beginCommit,
+ EndCommitNumber: endCommit,
+ QueryPlan: plan,
}
- fullStatement := stmt + strings.Join(whereClauses, " AND ")
- rows, err := s.db.QueryContext(context.TODO(), fullStatement)
+ // Expand the template for the SQL.
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[queryTraces].Execute(&b, context); err != nil {
+ return nil, skerr.Wrapf(err, "failed to expand trace names template")
+ }
+
+ sql := b.String()
+ // Execute the query.
+ rows, err := s.db.Query(ctx, sql)
if err != nil {
- sklog.Debugf("QueryTracesByIndex: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
ret := types.TraceSet{}
-
for rows.Next() {
- var traceName string
+ var jsonParams string
var commitNumber types.CommitNumber
var val float64
- if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
+ if err := rows.Scan(&jsonParams, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
+
+ p := paramtools.Params{}
+ if err := json.Unmarshal([]byte(jsonParams), &p); err != nil {
+ sklog.Warningf("Invalid JSON params found in query response: %s", err)
+ continue
+ }
+ traceName, err := query.MakeKey(p)
+ if err != nil {
+ sklog.Warningf("Invalid trace name found in query response: %s", err)
+ continue
+ }
+ offset := s.OffsetFromCommitNumber(commitNumber)
if _, ok := ret[traceName]; ok {
- ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
+ ret[traceName][offset] = float32(val)
} else {
// TODO(jcgregorio) Replace this vec32.New() with a
// https://golang.org/pkg/sync/#Pool since this is our most used/reused
// type of memory.
ret[traceName] = vec32.New(int(s.tileSize))
- ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
+ ret[traceName][offset] = float32(val)
}
}
if err := rows.Err(); err != nil {
@@ -604,74 +605,52 @@
close(outParams)
return outParams, nil
}
+
// Sanitize our inputs.
if err := query.ValidateParamSet(plan); err != nil {
- close(outParams)
- return outParams, skerr.Wrapf(err, "invalid query %#v", *q)
+ return nil, skerr.Wrapf(err, "invalid query %#v", *q)
}
- tileNumberString := strconv.FormatInt(int64(tileNumber), 10)
+ // Prepare the template context.
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
-
- stmt := fmt.Sprintf(`
-SELECT DISTINCT
- TraceIDs.trace_name
-FROM
- TraceIDs
-INNER JOIN
- TraceValues
-ON
- TraceValues.trace_id = TraceIDs.trace_id
-WHERE
- TraceValues.commit_number >= %d
- AND TraceValues.commit_number <= %d
- AND `, beginCommit, endCommit)
-
- // TODO(jcgregorio) Break out into own function.
- // TODO(jcgregorio) Convert to use text/templates.
- whereClauses := make([]string, len(plan))
- clauseIndex := 0
- for key, values := range plan {
- inClause := make([]string, len(values))
- for i, value := range values {
- inClause[i] = fmt.Sprintf("'%s=%s'", key, value)
- }
- whereClauses[clauseIndex] = `
- TraceValues.trace_id IN (
- SELECT trace_id FROM Postings WHERE key_value IN (` +
- strings.Join(inClause, ",") +
- `) AND tile_number=` + tileNumberString + `
- )`
- clauseIndex++
+ context := queryTracesContext{
+ BeginCommitNumber: beginCommit,
+ EndCommitNumber: endCommit,
+ QueryPlan: plan,
}
- fullStatement := stmt + strings.Join(whereClauses, " AND ")
- rows, err := s.db.QueryContext(context.TODO(), fullStatement)
+ // Expand the template for the SQL.
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[queryTracesIDOnly].Execute(&b, context); err != nil {
+ return nil, skerr.Wrapf(err, "failed to expand trace names template")
+ }
+
+ // Execute the query.
+ rows, err := s.db.Query(ctx, b.String())
if err != nil {
- sklog.Debugf("QueryTracesIDOnlyByIndex: fullStatement: %q", fullStatement)
- close(outParams)
- return outParams, skerr.Wrap(err)
+ return nil, skerr.Wrap(err)
}
go func() {
defer close(outParams)
for rows.Next() {
- var traceName string
- if err := rows.Scan(&traceName); err != nil {
+ var jsonParams string
+ if err := rows.Scan(&jsonParams); err != nil {
sklog.Errorf("Failed to scan traceName: %s", skerr.Wrap(err))
return
}
- p, err := query.ParseKey(traceName)
- if err != nil {
+
+ p := paramtools.Params{}
+ if err := json.Unmarshal([]byte(jsonParams), &p); err != nil {
sklog.Errorf("Failed to parse traceName: %s", skerr.Wrap(err))
- return
+ continue
}
outParams <- p
+
}
if err := rows.Err(); err != nil {
sklog.Errorf("Failed while reading traceNames: %s", skerr.Wrap(err))
- return
}
}()
@@ -696,39 +675,51 @@
// Get the traceIDs for the given keys.
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
- stmt := fmt.Sprintf(`
-SELECT
- TraceIDs.trace_name, TraceValues.commit_number, TraceValues.val
-FROM
- TraceIDs
-INNER JOIN
- TraceValues ON TraceValues.trace_id = TraceIDs.trace_id
-WHERE
- TraceValues.commit_number >= %d
- AND TraceValues.commit_number <= %d
- AND TraceIDs.trace_name IN (`, beginCommit, endCommit)
- inClause := make([]string, len(keys))
- for i, key := range keys {
- singleQuoted := "'" + key + "'"
- inClause[i] = singleQuoted
+
+ // Populate the context for the SQL template.
+ readTracesContext := readTracesContext{
+ BeginCommitNumber: beginCommit,
+ EndCommitNumber: endCommit,
+ TraceIDs: make([]traceIDForSQL, 0, len(keys)),
}
- fullStatement := stmt + strings.Join(inClause, ",") + ")"
- rows, err := s.db.QueryContext(context.TODO(), fullStatement)
+
+ for _, traceName := range keys {
+ readTracesContext.TraceIDs = append(readTracesContext.TraceIDs, traceIDForSQLFromTraceName(traceName))
+ }
+ // Expand the template for the SQL.
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[readTraces].Execute(&b, readTracesContext); err != nil {
+ return nil, skerr.Wrapf(err, "failed to expand read traces template")
+ }
+
+ // Execute the query.
+ rows, err := s.db.Query(context.TODO(), b.String())
if err != nil {
- sklog.Debugf("ReadTraces: fullStatement: %q", fullStatement)
return nil, skerr.Wrap(err)
}
+
for rows.Next() {
- var traceName string
+ var jsonParams string
var commitNumber types.CommitNumber
var val float64
- if err := rows.Scan(&traceName, &commitNumber, &val); err != nil {
+ if err := rows.Scan(&jsonParams, &commitNumber, &val); err != nil {
return nil, skerr.Wrap(err)
}
+
+ p := paramtools.Params{}
+ if err := json.Unmarshal([]byte(jsonParams), &p); err != nil {
+ sklog.Warningf("Invalid JSON params found in query response: %s", err)
+ continue
+ }
+ traceName, err := query.MakeKey(p)
+ if err != nil {
+ sklog.Warningf("Invalid trace name found in query response: %s", err)
+ continue
+ }
ret[traceName][s.OffsetFromCommitNumber(commitNumber)] = float32(val)
}
if err := rows.Err(); err != nil {
- return nil, skerr.Wrapf(err, "tileNumber=%d", tileNumber)
+ return nil, skerr.Wrap(err)
}
return ret, nil
@@ -748,7 +739,7 @@
func (s *SQLTraceStore) TraceCount(ctx context.Context, tileNumber types.TileNumber) (int64, error) {
var ret int64
beginCommit, endCommit := types.TileCommitRangeForTileNumber(tileNumber, s.tileSize)
- err := s.preparedStatements[traceCount].QueryRowContext(context.TODO(), beginCommit, endCommit).Scan(&ret)
+ err := s.db.QueryRow(context.TODO(), statements[traceCount], beginCommit, endCommit).Scan(&ret)
return ret, skerr.Wrap(err)
}
@@ -762,11 +753,11 @@
// the sourceFileIDFromSQL of that filename.
func (s *SQLTraceStore) updateSourceFile(filename string) (sourceFileIDFromSQL, error) {
ret := badSourceFileIDFromSQL
- _, err := s.preparedStatements[insertIntoSourceFiles].ExecContext(context.TODO(), filename)
+ _, err := s.db.Exec(context.TODO(), statements[insertIntoSourceFiles], filename)
if err != nil {
return ret, skerr.Wrap(err)
}
- err = s.preparedStatements[getSourceFileID].QueryRowContext(context.TODO(), filename).Scan(&ret)
+ err = s.db.QueryRow(context.TODO(), statements[getSourceFileID], filename).Scan(&ret)
if err != nil {
return ret, skerr.Wrap(err)
}
@@ -774,155 +765,99 @@
return ret, nil
}
-// updatePostings writes all the entries into our inverted index in Postings for
-// the given traceID and tileNumber. The Params given are from the parse trace
-// name.
-func (s *SQLTraceStore) updatePostings(p paramtools.Params, tileNumber types.TileNumber, traceID traceIDFromSQL) error {
- // Clean the data to avoid SQL injection attacks.
- p = query.ForceValid(p)
-
- // Prepare the data for the SQL template.
- values := make([]insertIntoPostingsValue, 0, len(p))
- for k, v := range p {
- keyValue := fmt.Sprintf("%s=%s", k, v)
- values = append(values, insertIntoPostingsValue{
- TileNumber: tileNumber,
- KeyValue: keyValue,
- TraceID: traceID,
- })
- }
-
- // Expand the template.
- var b bytes.Buffer
- if err := s.unpreparedStatements[insertIntoPostings].Execute(&b, values); err != nil {
- return skerr.Wrapf(err, "Failed to expand insertIntoPostings template with: %v", values)
- }
-
- _, err := s.db.ExecContext(context.TODO(), b.String())
- if err != nil {
- return skerr.Wrap(err)
- }
-
- return nil
-}
-
-func getPostingsCacheEntryKey(traceID traceIDFromSQL, tileNumber types.TileNumber) string {
- return fmt.Sprintf("%d-%d", traceID, tileNumber)
-}
-
-func getHashedTraceName(traceName string) string {
- sum := md5.Sum([]byte(traceName))
- return hex.EncodeToString(sum[:])
-}
-
-func (s *SQLTraceStore) getTraceIDFromCache(hashedTraceName string) (traceIDFromSQL, bool) {
- if traceIDString, ok := s.cache.Get(hashedTraceName); ok {
- traceIDInt64, err := strconv.ParseInt(traceIDString, 10, 64)
- if err != nil {
- return badTraceIDFromSQL, false
- }
- return traceIDFromSQL(traceIDInt64), true
- }
- return badTraceIDFromSQL, false
-}
-
-// postingsCacheValue is what we write to the cache when postings exist for the
-// given key. The actual value doesn't matter since we only check for existence
-// when reading the cache.
-const postingsCacheValue = "1"
-
-// writeTraceIDAndPostings writes the trace name into the TraceIDs table and
-// returns the traceIDFromSQL of that trace name. This operation will happen
-// repeatedly as data is ingested so we cache the results in the LRU cache.
-func (s *SQLTraceStore) writeTraceIDAndPostings(traceNameAsParams paramtools.Params, tileNumber types.TileNumber) (traceIDFromSQL, error) {
- traceID := badTraceIDFromSQL
-
- traceName, err := query.MakeKey(traceNameAsParams)
- if err != nil {
- return traceID, skerr.Wrap(err)
- }
- hashedTraceName := getHashedTraceName(traceName)
-
- // Get a traceIDFromSQL for the traceName.
- var ok bool
- if traceID, ok = s.getTraceIDFromCache(hashedTraceName); ok {
- s.writeTraceIDAndPostingsCacheHitMetric.Inc(1)
- } else {
- s.writeTraceIDAndPostingsCacheMissMetric.Inc(1)
- _, err := s.preparedStatements[insertIntoTraceIDs].ExecContext(context.TODO(), traceName)
- if err != nil {
- return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
- }
- err = s.preparedStatements[getTraceID].QueryRowContext(context.TODO(), traceName).Scan(&traceID)
- if err != nil {
- return traceID, skerr.Wrapf(err, "traceName=%q", traceName)
- }
- s.cache.Add(hashedTraceName, fmt.Sprintf("%d", traceID))
- }
- postingsCacheEntryKey := getPostingsCacheEntryKey(traceID, tileNumber)
- if !s.cache.Exists(postingsCacheEntryKey) {
- s.writeTraceIDAndPostingsPostingsCacheMissMetric.Inc(1)
- // TODO(jcgregorio) Do a query on the database to see if the postings are there.
-
- // Update postings.
- if err := s.updatePostings(traceNameAsParams, tileNumber, traceID); err != nil {
- return traceID, skerr.Wrapf(err, "traceName=%q tileNumber=%d traceID=%d", traceName, tileNumber, traceID)
- }
- s.cache.Add(postingsCacheEntryKey, postingsCacheValue)
- } else {
- s.writeTraceIDAndPostingsPostingsCacheHitMetric.Inc(1)
- }
-
- return traceID, nil
-}
-
-// updateTraceValues writes the given slice of replaceTraceValues into the store.
-func (s *SQLTraceStore) updateTraceValues(templateContext []replaceTraceValue) error {
- defer timer.NewWithSummary("perfserver_sqltracestore_updateTraceValues", s.updateTraceValuesMetric).Stop()
- var b bytes.Buffer
- if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, templateContext); err != nil {
- return skerr.Wrapf(err, "failed to expand template")
- }
- _, err := s.db.ExecContext(context.TODO(), b.String())
- return skerr.Wrap(err)
-}
-
// WriteTraces implements the tracestore.TraceStore interface.
func (s *SQLTraceStore) WriteTraces(commitNumber types.CommitNumber, params []paramtools.Params, values []float32, _ paramtools.ParamSet, source string, _ time.Time) error {
defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces", s.writeTracesMetric).Stop()
- tileNumber := types.TileNumberFromCommitNumber(commitNumber, s.tileSize)
// Get the row id for the source file.
sourceID, err := s.updateSourceFile(source)
if err != nil {
return skerr.Wrap(err)
}
- t := timer.NewWithSummary("perfserver_sqltracestore_buildTracesContext", s.buildTracesContextMetric)
+ t := timer.NewWithSummary("perfserver_sqltracestore_buildTracesContexts", s.buildTracesContextsMetric)
// Build the 'context' which will be used to populate the SQL template.
- templateContext := make([]replaceTraceValue, 0, len(params))
+ namesTemplateContext := make([]replaceTraceNamesContext, 0, len(params))
+ valuesTemplateContext := make([]replaceTraceValuesContext, 0, len(params))
+
for i, p := range params {
- traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
+ traceName, err := query.MakeKey(p)
if err != nil {
- t.Stop()
- return skerr.Wrap(err)
+ continue
}
- templateContext = append(templateContext, replaceTraceValue{
- TraceID: traceID,
- CommitNumber: commitNumber,
- Val: values[i],
- SourceFileID: sourceID,
+ traceID := traceIDForSQLFromTraceName(traceName)
+ jsonParams, err := json.Marshal(p)
+ if err != nil {
+ continue
+ }
+ valuesTemplateContext = append(valuesTemplateContext, replaceTraceValuesContext{
+ MD5HexTraceID: traceID,
+ CommitNumber: commitNumber,
+ Val: values[i],
+ SourceFileID: sourceID,
})
+
+ if !s.cache.Exists(string(traceID)) {
+ namesTemplateContext = append(namesTemplateContext, replaceTraceNamesContext{
+ MD5HexTraceID: traceID,
+ JSONParams: string(jsonParams),
+ })
+ }
}
t.Stop()
defer timer.NewWithSummary("perfserver_sqltracestore_writeTraces_sql", s.writeTracesMetricSQL).Stop()
- err = util.ChunkIterParallel(context.TODO(), len(templateContext), traceValuesInsertBatchSize, func(ctx context.Context, startIdx int, endIdx int) error {
- if err := s.updateTraceValues(templateContext[startIdx:endIdx]); err != nil {
- return skerr.Wrapf(err, "failed inserting subSlice: [%d:%d]", startIdx, endIdx)
+ sklog.Infof("About to format %d trace names", len(params))
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+ defer cancel()
+
+ if len(namesTemplateContext) > 0 {
+
+ err = util.ChunkIter(len(namesTemplateContext), 100, func(startIdx int, endIdx int) error {
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[replaceTraceNames].Execute(&b, namesTemplateContext[startIdx:endIdx]); err != nil {
+ return skerr.Wrapf(err, "failed to expand trace names template on slice [%d, %d]", startIdx, endIdx)
+ }
+ sql := b.String()
+
+ sklog.Infof("About to write %d trace names with sql of length %d", len(params), len(sql))
+ if _, err := s.db.Exec(ctx, sql); err != nil {
+ return skerr.Wrapf(err, "Executing: %q", b.String())
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range namesTemplateContext {
+ s.cache.Add(string(entry.MD5HexTraceID), "1")
+ }
+ }
+
+ sklog.Infof("About to format %d trace values", len(params))
+
+ err = util.ChunkIter(len(valuesTemplateContext), writeTracesChunkSize, func(startIdx int, endIdx int) error {
+ var b bytes.Buffer
+ if err := s.unpreparedStatements[replaceTraceValues].Execute(&b, valuesTemplateContext[startIdx:endIdx]); err != nil {
+ return skerr.Wrapf(err, "failed to expand trace values template")
+ }
+
+ sql := b.String()
+ sklog.Infof("About to write %d trace values with sql of length %d", len(params), len(sql))
+ if _, err := s.db.Exec(ctx, sql); err != nil {
+ return skerr.Wrapf(err, "Executing: %q", b.String())
}
return nil
})
+ if err != nil {
+ return err
+ }
+
+ sklog.Info("Finished writing trace values.")
+
return err
}
diff --git a/perf/go/tracestore/sqltracestore/sqltracestore_test.go b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
index 9d404bc..c4604ad 100644
--- a/perf/go/tracestore/sqltracestore/sqltracestore_test.go
+++ b/perf/go/tracestore/sqltracestore/sqltracestore_test.go
@@ -3,6 +3,7 @@
import (
"context"
"fmt"
+ "math/rand"
"testing"
"time"
@@ -13,7 +14,6 @@
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/go/vec32"
"go.skia.org/infra/perf/go/config"
- perfsql "go.skia.org/infra/perf/go/sql"
"go.skia.org/infra/perf/go/sql/sqltest"
"go.skia.org/infra/perf/go/types"
)
@@ -23,36 +23,36 @@
e = vec32.MissingDataSentinel
// testTileSize is the size of tiles we use for tests.
- testTileSize = 8
+ testTileSize = int32(8)
)
-func TestCockroachDB(t *testing.T) {
- unittest.LargeTest(t)
-
- cfg := config.DataStoreConfig{
- TileSize: testTileSize,
- Project: "test",
- Instance: "test",
- Table: "test",
- Shards: 8,
- }
-
- for name, subTest := range subTests {
- t.Run(name, func(t *testing.T) {
- db, cleanup := sqltest.NewCockroachDBForTests(t, "tracestore", sqltest.ApplyMigrations)
- // Commenting out the defer cleanup() can sometimes make failures
- // easier to understand.
- defer cleanup()
-
- store, err := New(db, perfsql.CockroachDBDialect, cfg)
- require.NoError(t, err)
-
- subTest(t, store)
- })
- }
+var cfg = config.DataStoreConfig{
+ TileSize: testTileSize,
+ Project: "test",
+ Instance: "test",
+ Table: "test",
+ Shards: 8,
}
-func testUpdateSourceFile(t *testing.T, s *SQLTraceStore) {
+func commonTestSetup(t *testing.T, populateTraces bool) (context.Context, *SQLTraceStore, sqltest.Cleanup) {
+ unittest.LargeTest(t)
+ ctx := context.Background()
+ db, cleanup := sqltest.NewCockroachDBForTests(t, fmt.Sprintf("tracestore%d", rand.Int63()), sqltest.ApplyMigrations)
+
+ store, err := New(db, cfg)
+ require.NoError(t, err)
+
+ if populateTraces {
+ populatedTestDB(t, store)
+ }
+
+ return ctx, store, cleanup
+}
+
+func TestUpdateSourceFile(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
// Do each update twice to ensure the IDs don't change.
id, err := s.updateSourceFile("foo.txt")
assert.NoError(t, err)
@@ -69,90 +69,9 @@
assert.Equal(t, id, id2)
}
-func testWriteTraceIDAndPostings(t *testing.T, s *SQLTraceStore) {
- const traceName = ",arch=x86,config=8888,"
- p := paramtools.NewParams(traceName)
- const tileNumber types.TileNumber = 1
-
- // Do each update twice to ensure the IDs don't change.
- traceID, err := s.writeTraceIDAndPostings(p, tileNumber)
- assert.NoError(t, err)
-
- traceID2, err := s.writeTraceIDAndPostings(p, tileNumber)
- assert.NoError(t, err)
- assert.Equal(t, traceID, traceID2)
-
- // Confirm the cache entries exist.
- got, ok := s.getTraceIDFromCache(getHashedTraceName(traceName))
- require.True(t, ok)
- assert.Equal(t, traceID, got)
- assert.True(t, s.cache.Exists(getPostingsCacheEntryKey(traceID, tileNumber)))
-
- const traceName2 = ",arch=arm,config=8888,"
- p2 := paramtools.NewParams(traceName2)
-
- traceID, err = s.writeTraceIDAndPostings(p2, tileNumber)
- assert.NoError(t, err)
- assert.NotEqual(t, traceID, traceID2)
-
- traceID2, err = s.writeTraceIDAndPostings(p2, tileNumber)
- assert.NoError(t, err)
- assert.Equal(t, traceID, traceID2)
-}
-
-func testWriteTraces_MultipleBatches_Success(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
-
- const commitNumber = types.CommitNumber(1)
-
- // Add enough values to force it to be done in batches.
- const testLength = 2*traceValuesInsertBatchSize + 1
-
- const tileNumber = types.TileNumber(0)
-
- traceNames := make([]paramtools.Params, 0, testLength)
- values := make([]float32, 0, testLength)
-
- for i := 0; i < testLength; i++ {
- traceNames = append(traceNames, paramtools.Params{
- "traceid": fmt.Sprintf("%d", i),
- "config": "8888",
- })
- values = append(values, float32(i))
- }
- err := s.WriteTraces(
- commitNumber,
- traceNames,
- values,
- paramtools.ParamSet{}, // ParamSet is empty because WriteTraces doesn't use it in this impl.
- "gs://not-tested-as-part-of-this-test.json",
- time.Time{}) // time is unused in this impl of TraceStore.
- require.NoError(t, err)
-
- // Confirm all traces were written.
- q, err := query.NewFromString("config=8888")
- require.NoError(t, err)
- ts, err := s.QueryTracesByIndex(ctx, tileNumber, q)
- assert.NoError(t, err)
- assert.Len(t, ts, testLength)
-
- // Spot test some values.
- q, err = query.NewFromString("config=8888&traceid=0")
- require.NoError(t, err)
- ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
- assert.NoError(t, err)
-
- assert.Equal(t, float32(0), ts[",config=8888,traceid=0,"][s.OffsetFromCommitNumber(commitNumber)])
-
- q, err = query.NewFromString(fmt.Sprintf("config=8888&traceid=%d", testLength-1))
- require.NoError(t, err)
- ts, err = s.QueryTracesByIndex(ctx, tileNumber, q)
- assert.NoError(t, err)
- assert.Equal(t, float32(testLength-1), ts[fmt.Sprintf(",config=8888,traceid=%d,", testLength-1)][s.OffsetFromCommitNumber(commitNumber)])
-}
-
-func testReadTraces(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
+func TestReadTraces(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
keys := []string{
",arch=x86,config=8888,",
@@ -174,8 +93,9 @@
}, ts)
}
-func testReadTraces_InvalidKey(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
+func TestReadTraces_InvalidKey(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
keys := []string{
",arch=x86,config='); DROP TABLE TraceValues,",
@@ -186,8 +106,9 @@
require.Error(t, err)
}
-func testReadTraces_NoResults(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
+func TestReadTraces_NoResults(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
keys := []string{
",arch=unknown,",
@@ -200,8 +121,9 @@
})
}
-func testReadTraces_EmptyTileReturnsNoData(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
+func TestReadTraces_EmptyTileReturnsNoData(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
keys := []string{
",arch=x86,config=8888,",
@@ -217,9 +139,9 @@
})
}
-func testQueryTracesIDOnlyByIndex_EmptyQueryReturnsError(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesIDOnlyByIndex_EmptyQueryReturnsError(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches one trace.
q, err := query.NewFromString("")
@@ -240,9 +162,9 @@
return ret
}
-func testQueryTracesIDOnlyByIndex_EmptyTileReturnsEmptyParamset(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesIDOnlyByIndex_EmptyTileReturnsEmptyParamset(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches one trace.
q, err := query.NewFromString("config=565")
@@ -252,9 +174,9 @@
assert.Empty(t, paramSetFromParamsChan(ch))
}
-func testQueryTracesIDOnlyByIndex_MatchesOneTrace(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesIDOnlyByIndex_MatchesOneTrace(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches one trace.
q, err := query.NewFromString("config=565")
@@ -268,9 +190,9 @@
assert.Equal(t, expected, paramSetFromParamsChan(ch))
}
-func testQueryTracesIDOnlyByIndex_MatchesTwoTraces(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesIDOnlyByIndex_MatchesTwoTraces(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches two traces.
q, err := query.NewFromString("arch=x86")
@@ -284,9 +206,9 @@
assert.Equal(t, expected, paramSetFromParamsChan(ch))
}
-func testQueryTracesByIndex_MatchesOneTrace(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesByIndex_MatchesOneTrace(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches one trace.
q, err := query.NewFromString("config=565")
@@ -298,9 +220,9 @@
})
}
-func testQueryTracesByIndex_MatchesOneTraceInTheSecondTile(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesByIndex_MatchesOneTraceInTheSecondTile(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches one trace second tile.
q, err := query.NewFromString("config=565")
@@ -312,9 +234,9 @@
})
}
-func testQueryTracesByIndex_MatchesTwoTraces(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesByIndex_MatchesTwoTraces(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that matches two traces.
q, err := query.NewFromString("arch=x86")
@@ -327,9 +249,9 @@
})
}
-func testQueryTracesByIndex_QueryHasUnknownParamReturnsNoError(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestQueryTracesByIndex_QueryHasUnknownParamReturnsNoError(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Query that has no matching params in the given tile.
q, err := query.NewFromString("arch=unknown")
@@ -339,8 +261,9 @@
assert.Nil(t, ts)
}
-func testQueryTracesByIndex_QueryAgainstTileWithNoDataReturnsNoError(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
+func TestQueryTracesByIndex_QueryAgainstTileWithNoDataReturnsNoError(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
// Query that has no Postings for the given tile.
q, err := query.NewFromString("arch=unknown")
@@ -350,9 +273,9 @@
assert.Nil(t, ts)
}
-func testTraceCount(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestTraceCount(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
count, err := s.TraceCount(ctx, 0)
assert.NoError(t, err)
@@ -367,79 +290,65 @@
assert.Equal(t, int64(0), count)
}
-func testParamSetForTile(t *testing.T, s *SQLTraceStore) {
- const tileNumber types.TileNumber = 1
- _, err := s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=x86,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=565,arch=arm,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=arm64,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=gpu,arch=x86_64,"), tileNumber)
- assert.NoError(t, err)
+func TestParamSetForTile(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
ps, err := s.paramSetForTile(1)
assert.NoError(t, err)
expected := paramtools.ParamSet{
- "arch": []string{"arm", "arm64", "x86", "x86_64"},
- "config": []string{"565", "8888", "gpu"},
+ "arch": []string{"x86"},
+ "config": []string{"565", "8888"},
}
assert.Equal(t, expected, ps)
}
-func testParamSetForTile_Empty(t *testing.T, s *SQLTraceStore) {
+func TestParamSetForTile_Empty(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
// Test the empty case where there is no data in the store.
ps, err := s.paramSetForTile(1)
assert.NoError(t, err)
assert.Equal(t, paramtools.ParamSet{}, ps)
}
-func testGetLatestTile(t *testing.T, s *SQLTraceStore) {
- _, err := s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=x86,"), types.TileNumber(1))
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=arm64,"), types.TileNumber(5))
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=gpu,arch=x86_64,"), types.TileNumber(7))
- assert.NoError(t, err)
+func TestGetLatestTile(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
tileNumber, err := s.GetLatestTile()
assert.NoError(t, err)
- assert.Equal(t, types.TileNumber(7), tileNumber)
+ assert.Equal(t, types.TileNumber(1), tileNumber)
}
-func testGetLatestTile_Empty(t *testing.T, s *SQLTraceStore) {
+func TestGetLatestTile_Empty(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
// Test the empty case where there is no data in datastore.
tileNumber, err := s.GetLatestTile()
assert.Error(t, err)
assert.Equal(t, types.BadTileNumber, tileNumber)
}
-func testGetOrderedParamSet(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
-
- const tileNumber types.TileNumber = 1
- // Now add some trace ids.
- _, err := s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=x86,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=565,arch=arm,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=arm64,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=gpu,arch=x86_64,"), tileNumber)
- assert.NoError(t, err)
+func TestGetOrderedParamSet(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
ops, err := s.GetOrderedParamSet(ctx, 1)
assert.NoError(t, err)
expected := paramtools.ParamSet{
- "arch": []string{"arm", "arm64", "x86", "x86_64"},
- "config": []string{"565", "8888", "gpu"},
+ "arch": []string{"x86"},
+ "config": []string{"565", "8888"},
}
assert.Equal(t, expected, ops.ParamSet)
assert.Equal(t, []string{"arch", "config"}, ops.KeyOrder)
}
-func testGetOrderedParamSet_Empty(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
+func TestGetOrderedParamSet_Empty(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
// Test the empty case where there is no data in datastore.
ops, err := s.GetOrderedParamSet(ctx, 1)
@@ -447,27 +356,20 @@
assert.Equal(t, paramtools.ParamSet{}, ops.ParamSet)
}
-func testCountIndices(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
-
- const tileNumber types.TileNumber = 1
- // Now add some trace ids.
- _, err := s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=x86,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=565,arch=arm,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=8888,arch=arm64,"), tileNumber)
- assert.NoError(t, err)
- _, err = s.writeTraceIDAndPostings(paramtools.NewParams(",config=gpu,arch=x86_64,"), tileNumber)
- assert.NoError(t, err)
+func TestCountIndices(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
count, err := s.CountIndices(ctx, 1)
assert.NoError(t, err)
- assert.Equal(t, int64(8), count)
+
+ // The always returns 0.
+ assert.Equal(t, int64(0), count)
}
-func testCountIndices_Empty(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
+func TestCountIndices_Empty(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
// Test the empty case where there is no data in datastore.
count, err := s.CountIndices(ctx, 1)
@@ -475,17 +377,18 @@
assert.Equal(t, int64(0), count)
}
-func testGetSource(t *testing.T, s *SQLTraceStore) {
- populatedTestDB(t, s)
- ctx := context.Background()
+func TestGetSource(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
filename, err := s.GetSource(ctx, types.CommitNumber(2), ",arch=x86,config=8888,")
- assert.NoError(t, err)
+ require.NoError(t, err)
assert.Equal(t, "gs://perf-bucket/2020/02/08/12/testdata.json", filename)
}
-func testGetSource_Empty(t *testing.T, s *SQLTraceStore) {
- ctx := context.Background()
+func TestGetSource_Empty(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, true)
+ defer cleanup()
// Confirm the call works with an empty tracestore.
filename, err := s.GetSource(ctx, types.CommitNumber(5), ",arch=x86,config=8888,")
@@ -493,6 +396,29 @@
assert.Equal(t, "", filename)
}
+func TestSQLTraceStore_TileNumber(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
+ assert.Equal(t, types.TileNumber(0), s.TileNumber(types.CommitNumber(1)))
+ assert.Equal(t, types.TileNumber(1), s.TileNumber(types.CommitNumber(9)))
+}
+
+func TestSQLTraceStore_TileSize(t *testing.T) {
+ _, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
+ assert.Equal(t, testTileSize, s.TileSize())
+}
+
+func TestSQLTraceStore_WriteIndices_Success(t *testing.T) {
+ ctx, s, cleanup := commonTestSetup(t, false)
+ defer cleanup()
+
+ // WriteIndices is a no-op in the SQL backed trace store.
+ assert.NoError(t, s.WriteIndices(ctx, types.TileNumber(1)))
+}
+
func TestCommitNumberOfTileStart(t *testing.T) {
unittest.SmallTest(t)
s := &SQLTraceStore{
@@ -530,36 +456,16 @@
require.NoError(t, err)
}
-// subTestFunction is a func we will call to test one aspect of *SQLTraceStore.
-type subTestFunction func(t *testing.T, s *SQLTraceStore)
-
-// subTests are all the tests we have for *SQLTraceStore.
-var subTests = map[string]subTestFunction{
- "testUpdateSourceFile": testUpdateSourceFile,
- "testWriteTraceIDAndPostings": testWriteTraceIDAndPostings,
- "testParamSetForTile": testParamSetForTile,
- "testParamSetForTile_Empty": testParamSetForTile_Empty,
- "testGetLatestTile": testGetLatestTile,
- "testGetLatestTile_Empty": testGetLatestTile_Empty,
- "testGetOrderedParamSet": testGetOrderedParamSet,
- "testGetOrderedParamSet_Empty": testGetOrderedParamSet_Empty,
- "testCountIndices": testCountIndices,
- "testCountIndices_Empty": testCountIndices_Empty,
- "testGetSource_Empty": testGetSource_Empty,
- "testReadTraces": testReadTraces,
- "testWriteTraces_MultipleBatches_Success": testWriteTraces_MultipleBatches_Success,
- "testReadTraces_InvalidKey": testReadTraces_InvalidKey,
- "testReadTraces_NoResults": testReadTraces_NoResults,
- "testReadTraces_EmptyTileReturnsNoData": testReadTraces_EmptyTileReturnsNoData,
- "testQueryTracesIDOnlyByIndex_EmptyQueryReturnsError": testQueryTracesIDOnlyByIndex_EmptyQueryReturnsError,
- "testQueryTracesIDOnlyByIndex_EmptyTileReturnsEmptyParamset": testQueryTracesIDOnlyByIndex_EmptyTileReturnsEmptyParamset,
- "testQueryTracesIDOnlyByIndex_MatchesOneTrace": testQueryTracesIDOnlyByIndex_MatchesOneTrace,
- "testQueryTracesIDOnlyByIndex_MatchesTwoTraces": testQueryTracesIDOnlyByIndex_MatchesTwoTraces,
- "testQueryTracesByIndex_MatchesOneTrace": testQueryTracesByIndex_MatchesOneTrace,
- "testQueryTracesByIndex_MatchesOneTraceInTheSecondTile": testQueryTracesByIndex_MatchesOneTraceInTheSecondTile,
- "testQueryTracesByIndex_MatchesTwoTraces": testQueryTracesByIndex_MatchesTwoTraces,
- "testQueryTracesByIndex_QueryHasUnknownParamReturnsNoError": testQueryTracesByIndex_QueryHasUnknownParamReturnsNoError,
- "testQueryTracesByIndex_QueryAgainstTileWithNoDataReturnsNoError": testQueryTracesByIndex_QueryAgainstTileWithNoDataReturnsNoError,
- "testTraceCount": testTraceCount,
- "testGetSource": testGetSource,
+func Test_traceIDForSQLFromTraceName_Success(t *testing.T) {
+ unittest.SmallTest(t)
+ /*
+ $ python3
+ Python 3.7.3 (default, Dec 20 2019, 18:57:59)
+ [GCC 8.3.0] on linux
+ Type "help", "copyright", "credits" or "license" for more information.
+ >>> import hashlib
+ >>> hashlib.md5(b",arch=x86,config=8888,").hexdigest()
+ 'fe385b159ff55dca481069805e5ff050'
+ */
+ assert.Equal(t, traceIDForSQL(`\xfe385b159ff55dca481069805e5ff050`), traceIDForSQLFromTraceName(",arch=x86,config=8888,"))
}
diff --git a/perf/go/tracestore/tracestore.go b/perf/go/tracestore/tracestore.go
index b3df325..4e082b1 100644
--- a/perf/go/tracestore/tracestore.go
+++ b/perf/go/tracestore/tracestore.go
@@ -40,6 +40,7 @@
// QueryTracesIDOnlyByIndex returns a stream of ParamSets that match the
// given query.
+ // TODO(jcgregorio) Change to just return count and ParamSet.
QueryTracesIDOnlyByIndex(ctx context.Context, tileNumber types.TileNumber, q *query.Query) (<-chan paramtools.Params, error)
// ReadTraces loads the traces for the given trace keys.
diff --git a/perf/migrations/cdb.sql b/perf/migrations/cdb.sql
index cfd8b76..abbe312 100644
--- a/perf/migrations/cdb.sql
+++ b/perf/migrations/cdb.sql
@@ -146,7 +146,7 @@
TraceNames
INNER JOIN TraceValues2 ON TraceValues2.trace_id = TraceNames.trace_id
WHERE
- TraceNames.params ->> 'arch' IN ('x86')
- AND TraceNames.params ->> 'config' IN ('565', '8888')
- AND TraceValues2.commit_number >= 0
- AND TraceValues2.commit_number < 255;
\ No newline at end of file
+ TraceValues2.commit_number >= 0
+ AND TraceValues2.commit_number < 255
+ AND TraceNames.params ->> 'arch' IN ('x86')
+ AND TraceNames.params ->> 'config' IN ('565', '8888');
\ No newline at end of file
diff --git a/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql b/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
index 1614a69..30452d6 100644
--- a/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
+++ b/perf/migrations/cockroachdb/0002_create_initial_tables.up.sql
@@ -17,5 +17,5 @@
val REAL,
-- Id of the source filename, from SourceFiles.
source_file_id INT,
- PRIMARY KEY (trace_id, commit_number)
+ PRIMARY KEY (commit_number, trace_id)
);
\ No newline at end of file