Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint for querying time info for lsn #5497

Merged
merged 28 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a103b76
Add function to map over all timestamps
arpad-m Oct 7, 2023
184f266
Add get_time_range_by_lsn endpoint
arpad-m Oct 7, 2023
60e5777
Add tests
arpad-m Oct 7, 2023
24d56ed
Fix API spec yml
arpad-m Oct 7, 2023
b35b2e7
Configurable step size and fix SQL injection ability
arpad-m Oct 7, 2023
e2084ae
Add median as well
arpad-m Oct 10, 2023
4ba7744
More tests
arpad-m Oct 10, 2023
4a8031e
more
arpad-m Oct 10, 2023
381e286
Merge branch 'main' into arpad/ts_by_lsn
shanyp Oct 10, 2023
dd9769d
make clippy happy
shanyp Oct 10, 2023
00e1010
Enable synchronous commit
arpad-m Oct 10, 2023
34586de
Return runtime errors
arpad-m Oct 10, 2023
fdf4fca
Review comment
arpad-m Oct 10, 2023
07fcfb7
newlines
arpad-m Oct 10, 2023
ebdcb4a
Merge remote-tracking branch 'origin/main' into arpad/ts_by_lsn
arpad-m Oct 16, 2023
a72c339
Use a struct instead of the json macro and return a 404 status
arpad-m Oct 16, 2023
8d1ccf4
Update API spec and test
arpad-m Oct 16, 2023
6bc3a58
Use duration_since not elapsed
arpad-m Oct 17, 2023
72467b1
Only return one value instead of a range
arpad-m Oct 18, 2023
c6fecef
Remove test
arpad-m Oct 18, 2023
ac95bca
fix test
arpad-m Oct 18, 2023
33e1a48
Make clippy happy
arpad-m Oct 18, 2023
fc017bf
Use strptime to be Python 3.9 compatible
arpad-m Oct 18, 2023
f136d71
Adjust descriptions in yaml
arpad-m Oct 18, 2023
d9b3765
Rename API call to be consistent with get_lsn_by_timestamp
arpad-m Oct 18, 2023
ca84f2c
fix strptime to support nanoseconds
shanyp Oct 18, 2023
ebea344
Merge branch 'main' into arpad/ts_by_lsn
shanyp Oct 18, 2023
f6946e9
Make test more robust
arpad-m Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/postgres_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;

// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::from_pg_timestamp;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
Expand Down
61 changes: 51 additions & 10 deletions libs/postgres_ffi/src/xlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,42 @@ pub fn get_current_timestamp() -> TimestampTz {
to_pg_timestamp(SystemTime::now())
}

pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */
const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */
// Module to reduce the scope of the constants
mod timestamp_conversions {
use std::time::Duration;

use super::*;

const UNIX_EPOCH_JDATE: u64 = 2440588; // == date2j(1970, 1, 1)
const POSTGRES_EPOCH_JDATE: u64 = 2451545; // == date2j(2000, 1, 1)
const SECS_PER_DAY: u64 = 86400;
const USECS_PER_SEC: u64 = 1000000;
match time.duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => {
((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY))
* USECS_PER_SEC
+ n.subsec_micros() as u64) as i64
const SECS_DIFF_UNIX_TO_POSTGRES_EPOCH: u64 =
(POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY;

pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
match time.duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => {
((n.as_secs() - SECS_DIFF_UNIX_TO_POSTGRES_EPOCH) * USECS_PER_SEC
+ n.subsec_micros() as u64) as i64
}
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}

pub fn from_pg_timestamp(time: TimestampTz) -> SystemTime {
let time: u64 = time
.try_into()
.expect("timestamp before millenium (postgres epoch)");
let since_unix_epoch = time + SECS_DIFF_UNIX_TO_POSTGRES_EPOCH * USECS_PER_SEC;
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_micros(since_unix_epoch))
.expect("SystemTime overflow")
}
}

pub use timestamp_conversions::{from_pg_timestamp, to_pg_timestamp};

// Returns (aligned) end_lsn of the last record in data_dir with WAL segments.
// start_lsn must point to some previously known record boundary (beginning of
// the next record). If no valid record after is found, start_lsn is returned
Expand Down Expand Up @@ -481,4 +502,24 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
wal
}

// If you need to craft WAL and write tests for this module, put it at wal_craft crate.
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_ts_conversion() {
let now = SystemTime::now();
let round_trip = from_pg_timestamp(to_pg_timestamp(now));

let now_since = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let round_trip_since = round_trip.duration_since(SystemTime::UNIX_EPOCH).unwrap();
assert_eq!(now_since.as_micros(), round_trip_since.as_micros());

let now_pg = get_current_timestamp();
let round_trip_pg = to_pg_timestamp(from_pg_timestamp(now_pg));

assert_eq!(now_pg, round_trip_pg);
}

// If you need to craft WAL and write tests for this module, put it at wal_craft crate.
}
61 changes: 61 additions & 0 deletions pageserver/src/http/openapi_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,67 @@ paths:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"

/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_timestamp_of_lsn:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get timestamp for a given LSN
parameters:
- name: lsn
in: query
required: true
schema:
type: integer
description: A LSN to get the timestamp
responses:
"200":
description: OK
content:
application/json:
schema:
type: string
format: date-time
"400":
description: Error when no tenant id found in path, no timeline id or invalid timestamp
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Timeline not found, or there is no timestamp information for the given lsn
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"

/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
Expand Down
33 changes: 33 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
//! Management HTTP API
//!
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
use humantime::format_rfc3339;
use hyper::header::CONTENT_TYPE;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
Expand Down Expand Up @@ -502,6 +504,33 @@ async fn get_lsn_by_timestamp_handler(
json_response(StatusCode::OK, result)
}

async fn get_timestamp_of_lsn_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;

let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;

let lsn_str = must_get_query_param(&request, "lsn")?;
let lsn = Lsn::from_str(&lsn_str)
.with_context(|| format!("Invalid LSN: {lsn_str:?}"))
.map_err(ApiError::BadRequest)?;

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;

match result {
Some(time) => {
let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
json_response(StatusCode::OK, time)
}
None => json_response(StatusCode::NOT_FOUND, ()),
}
}

async fn tenant_attach_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
Expand Down Expand Up @@ -1680,6 +1709,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
|r| api_handler(r, get_lsn_by_timestamp_handler),
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/get_timestamp_of_lsn",
|r| api_handler(r, get_timestamp_of_lsn_handler),
)
.put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
api_handler(r, timeline_gc_handler)
})
Expand Down
56 changes: 49 additions & 7 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
Expand Down Expand Up @@ -370,7 +371,6 @@ impl Timeline {
}
}

///
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
///
Expand All @@ -385,6 +385,50 @@ impl Timeline {
found_larger: &mut bool,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
if timestamp >= search_timestamp {
*found_larger = true;
return ControlFlow::Break(true);
} else {
*found_smaller = true;
}
ControlFlow::Continue(())
})
.await
}

/// Obtain the possible timestamp range for the given lsn.
///
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
pub async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<TimestampTz>, PageReconstructError> {
let mut max: Option<TimestampTz> = None;
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
if let Some(max_prev) = max {
max = Some(max_prev.max(timestamp));
} else {
max = Some(timestamp);
}
ControlFlow::Continue(())
})
.await?;

Ok(max)
}

/// Runs the given function on all the timestamps for a given lsn
///
/// The return value is either given by the closure, or set to the `Default`
/// impl's output.
async fn map_all_timestamps<T: Default>(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
.list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
.await?
Expand All @@ -402,16 +446,14 @@ impl Timeline {
timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]);
let timestamp = TimestampTz::from_be_bytes(timestamp_bytes);

if timestamp >= search_timestamp {
*found_larger = true;
return Ok(true);
} else {
*found_smaller = true;
match f(timestamp) {
ControlFlow::Break(b) => return Ok(b),
ControlFlow::Continue(()) => (),
}
}
}
}
Ok(false)
Ok(Default::default())
}

/// Get a list of SLRU segments
Expand Down
9 changes: 9 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ def timeline_get_lsn_by_timestamp(
res_json = res.json()
return res_json

def timeline_get_timestamp_of_lsn(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
log.info(f"Requesting time range of lsn {lsn}, tenant {tenant_id}, timeline {timeline_id}")
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_timestamp_of_lsn?lsn={lsn}",
)
self.verbose_error(res)
res_json = res.json()
return res_json

def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()

Expand Down
Loading