Skip to content

Commit

Permalink
Add Log::Timeline support to HTTP client
Browse files Browse the repository at this point in the history
  • Loading branch information
jnthn committed Jan 25, 2021
1 parent 26336f5 commit 7751b2a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 12 deletions.
77 changes: 65 additions & 12 deletions lib/Cro/HTTP/Client.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use OO::Monitors;
use Cro::HTTP::Client::CookieJar;
use Cro::HTTP::Internal;
use Cro::HTTP::Header;
use Cro::HTTP::LogTimelineSchema;
use Cro::HTTP::Request;
use Cro::HTTP::RequestSerializer;
use Cro::HTTP::ResponseParser;
Expand Down Expand Up @@ -418,6 +419,12 @@ class Cro::HTTP::Client {
#| Make a HTTP request, specifying the HTTP method (GET, POST, etc.),
#| the URL, and any further request options.
multi method request(Str $method, $url, %options --> Promise) {
my $parent = %options<PARENT-REQUEST-LOG>;
my $request-log = $parent
?? Cro::HTTP::LogTimeline::Request.start($parent, :$method, :url(~$url))
!! Cro::HTTP::LogTimeline::Request.start(:$method, :url(~$url));
CATCH { $request-log.end }

my $parsed-url = self && $!base-uri
?? $!base-uri.add($url)
!! Cro::Uri::HTTP.parse($url);
Expand All @@ -441,13 +448,15 @@ class Cro::HTTP::Client {
my $enable-push = self ?? $!push-promises // %options<push-promises> !! %options<push-promises>;

Promise(supply {
whenever self!get-pipeline($proxy-url // $parsed-url, $http, ca => %options<ca>, :$enable-push) -> $pipeline {
whenever self!get-pipeline($proxy-url // $parsed-url, $http, $request-log, ca => %options<ca>, :$enable-push) -> $pipeline {
if $pipeline !~~ Pipeline2 {
unless self.persistent || $request-object.has-header('connection') {
$request-object.append-header('Connection', 'close');
}
}
whenever $pipeline.send-request($request-object) {
QUIT { $request-log.end }

# Consider adding the connection back into the cache to use it
# again.
if self && $!persistent {
Expand All @@ -471,7 +480,10 @@ class Cro::HTTP::Client {
}
if .status $redirect-codes && ($follow !=== False) {
my $remain = $follow === True ?? 4 !! $follow.Int - 1;
die X::Cro::HTTP::Client::TooManyRedirects.new if $remain < 0;
if $remain < 0 {
$request-log.end;
die X::Cro::HTTP::Client::TooManyRedirects.new;
}
my $new-method = .status == 302|303 ?? 'GET' !! $method;
my %new-opts = %options;
%new-opts<follow> = $remain;
Expand All @@ -484,12 +496,20 @@ class Cro::HTTP::Client {
$new-url = .header('location').starts-with('/')
?? construct-url($_.header('location'))
!! .header('location');
%new-opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::Redirected.log($request-log, :status(.status), :url($new-url));
my $req = self.request($new-method, $new-url, %new-opts);
whenever $req { .emit };
CATCH { $request-log.end; }
whenever $req {
QUIT { $request-log.end; }
$request-log.end;
.emit
};
} else {
if self && $.cookie-jar.defined {
$.cookie-jar.add-from-response($_, $parsed-url);
}
$request-log.end;
.emit
}
} elsif 400 <= .status < 500 {
Expand All @@ -502,11 +522,22 @@ class Cro::HTTP::Client {
if .status == 401 && (%options<auth><if-asked>:exists) {
my %opts = %options;
%opts<auth><if-asked>:delete;
whenever self.request($method, $parsed-url, %options) { .emit };
%opts<PARENT-REQUEST-LOG> = $request-log;
Cro::HTTP::LogTimeline::AuthorizationRequested.log($request-log);
CATCH { $request-log.end; }
whenever self.request($method, $parsed-url, %opts) {
QUIT { $request-log.end; }
$request-log.end;
.emit;
};
} else {
Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status));
$request-log.end;
die X::Cro::HTTP::Error::Client.new(response => $_);
}
} elsif .status >= 500 {
Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status));
$request-log.end;
die X::Cro::HTTP::Error::Server.new(response => $_);
}
}
Expand Down Expand Up @@ -538,20 +569,26 @@ class Cro::HTTP::Client {
return False;
}

method !get-pipeline(Cro::Uri $url, $http, :$ca, :$enable-push) {
method !get-pipeline(Cro::Uri $url, $http, $log-parent, :$ca, :$enable-push) {
my $secure = $url.scheme.lc eq 'https';
my $host = $url.host;
my $port = $url.port // ($secure ?? 443 !! 80);
if self && $!connection-cache.pipeline-for($secure, $host, $port, $http) -> $pipeline {
my $p = Promise.new;
$p.keep($pipeline);
$p
Cro::HTTP::LogTimeline::ReuseConnection.log($log-parent,
:$host, :$port,
:secure($secure ?? 'Yes' !! 'No'),
:protocol(describe-protocol($http)));
Promise.kept($pipeline)
}
else {
self!build-pipeline($secure, $host, $port, $http, :$ca, :$enable-push)
self!build-pipeline($secure, $host, $port, $http, $log-parent, :$ca, :$enable-push)
}
}

sub describe-protocol($http) {
$http eq '1.1' | '2' ?? $http !! 'Any'
}

my class VersionDecisionNotifier does Cro::Transform {
has $.promise;
has $.result;
Expand All @@ -564,7 +601,11 @@ class Cro::HTTP::Client {
}
}

method !build-pipeline($secure, $host, $port, $http, :$ca, :$enable-push) {
method !build-pipeline($secure, $host, $port, $http, $log-parent, :$ca, :$enable-push) {
my $log-connection = Cro::HTTP::LogTimeline::EstablishConnection.start(
$log-parent, :$host, :$port,
:secure($secure ?? 'Yes' !! 'No'),
:protocol(describe-protocol($http)));
my @parts;
my $version-decision = Promise.new;
my $supports-alpn = supports-alpn();
Expand Down Expand Up @@ -625,10 +666,10 @@ class Cro::HTTP::Client {
my $in = Supplier::Preserving.new;
my %ca = self ?? (self.ca // $ca // {}) !! $ca // {};
my $out = $version-decision
?? $connector.establish($in.Supply, :$host, :$port, |{%tls-config, %ca})
?? establish($connector, $in.Supply, $log-connection, :$host, :$port, |{%tls-config, %ca})
!! do {
my $s = Supplier::Preserving.new;
$connector.establish($in.Supply, :$host, :$port, |{%tls-config, %ca}).tap:
establish($connector, $in.Supply, $log-connection, :$host, :$port, |{%tls-config, %ca}).tap:
{ $s.emit($_) },
done => { $s.done },
quit => {
Expand All @@ -644,6 +685,18 @@ class Cro::HTTP::Client {
}
}

sub establish(Cro::Connector $connector, Supply $incoming, $log-connection, *%options) {
supply {
my Promise $connection = $connector.connect(|%options);
$connection.then({ $log-connection.end });
whenever $connection -> Cro::Transform $transform {
whenever $transform.transformer($incoming) -> $msg {
emit $msg;
}
}
}
}

method !assemble-request(Str $method, Cro::Uri $base-url, Cro::Uri $proxy-url, %options --> Cro::HTTP::Request) {
# Add any query string parameters.
my $url;
Expand Down
13 changes: 13 additions & 0 deletions lib/Cro/HTTP/LogTimelineSchema.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,16 @@ class Cro::HTTP::LogTimeline::ResponseMiddleware
does Log::Timeline::Task['Cro', 'HTTP Server', 'Response Middleware'] {}
class Cro::HTTP::LogTimeline::ResponseBody
does Log::Timeline::Task['Cro', 'HTTP Server', 'Send Response Body'] {}

class Cro::HTTP::LogTimeline::Request
does Log::Timeline::Task['Cro', 'HTTP Client', 'Request'] {}
class Cro::HTTP::LogTimeline::ReuseConnection
does Log::Timeline::Event['Cro', 'HTTP Client', 'Reuse Connection'] {}
class Cro::HTTP::LogTimeline::EstablishConnection
does Log::Timeline::Task['Cro', 'HTTP Client', 'Establish Connection'] {}
class Cro::HTTP::LogTimeline::ErrorResponse
does Log::Timeline::Event['Cro', 'HTTP Client', 'Error Response'] {}
class Cro::HTTP::LogTimeline::AuthorizationRequested
does Log::Timeline::Event['Cro', 'HTTP Client', 'Authorization Requested'] {}
class Cro::HTTP::LogTimeline::Redirected
does Log::Timeline::Event['Cro', 'HTTP Client', 'Redirect'] {}

0 comments on commit 7751b2a

Please sign in to comment.