download + extract pipeline

- artifact.Download: resumable HTTP with optional SHA256 check + progress cb
- artifact.ExtractZstdTar: streamed zstd+tar with tar-slip defense
- aggregator client matches real API shape (digests/immutables/ancillary blocks
  with URIHolder polymorphism for templated immutable URIs)
- cmd: show + download subcommands wired up
- end-to-end verified against preprod: digests archive pulls cleanly, yields
  16836-entry SHA manifest ready for verification sprint

deps: github.com/klauspost/compress (pure-go zstd)
This commit is contained in:
Kayos 2026-04-23 15:16:48 -07:00
parent f87b7fc3c4
commit e557d85d5a
6 changed files with 483 additions and 84 deletions

View file

@ -1,7 +1,7 @@
// Package aggregator is a thin HTTP client for the Mithril aggregator REST API.
//
// Only the handful of endpoints needed for client-side snapshot workflows are
// exposed. Authentication is not required for the read paths used here.
// Only the endpoints needed for client-side snapshot workflows are exposed.
// Authentication is not required for the read paths used here.
package aggregator
import (
@ -23,57 +23,98 @@ type Client struct {
func New(baseURL string) *Client {
return &Client{
baseURL: strings.TrimRight(baseURL, "/"),
http: &http.Client{Timeout: 60 * time.Second},
http: &http.Client{Timeout: 120 * time.Second},
}
}
// CardanoDBSnapshot is the server-reported shape for /artifact/cardano-database/{hash}.
// Field set is trimmed to what the client actually consumes — full schema documented
// at https://mithril.network/doc/aggregator-api/.
// CardanoDBSnapshot is the server shape for /artifact/cardano-database and its
// /{hash} detail endpoint. The list response omits {digests,immutables,ancillary};
// only the detail endpoint populates them.
type CardanoDBSnapshot struct {
Hash string `json:"hash"`
MerkleRoot string `json:"merkle_root"`
Network string `json:"network"`
Beacon Beacon `json:"beacon"`
CertificateHash string `json:"certificate_hash"`
TotalDBSizeUncompressed uint64 `json:"total_db_size_uncompressed"`
Digests LocationList `json:"digests"`
ImmutablesAncillary LocationList `json:"immutables"`
ImmutablesIncremental *IncrementalImmutables `json:"immutables_incremental,omitempty"`
CreatedAt time.Time `json:"created_at"`
Hash string `json:"hash"`
MerkleRoot string `json:"merkle_root"`
Network string `json:"network"`
Beacon Beacon `json:"beacon"`
CertificateHash string `json:"certificate_hash"`
TotalDBSizeUncompressed uint64 `json:"total_db_size_uncompressed"`
CardanoNodeVersion string `json:"cardano_node_version"`
CreatedAt time.Time `json:"created_at"`
Digests DigestsBlock `json:"digests"`
Immutables ImmutsBlock `json:"immutables"`
Ancillary AncillaryBlock `json:"ancillary"`
}
type Beacon struct {
Epoch uint64 `json:"epoch"`
Epoch uint64 `json:"epoch"`
ImmutableFileNumber uint64 `json:"immutable_file_number"`
}
type LocationList struct {
Size uint64 `json:"size"`
Locations []LocationAlt `json:"locations"`
type DigestsBlock struct {
SizeUncompressed uint64 `json:"size_uncompressed"`
Locations []Location `json:"locations"`
}
// LocationAlt is a best-of alternative; Mithril returns a typed-discriminated object.
type LocationAlt struct {
Type string `json:"type"` // e.g. "cloud_storage", "ipfs"
URI string `json:"uri"`
type ImmutsBlock struct {
AverageSizeUncompressed uint64 `json:"average_size_uncompressed"`
Locations []Location `json:"locations"`
}
type IncrementalImmutables struct {
AverageSize uint64 `json:"average_size"`
Locations []LocationAlt `json:"locations"`
type AncillaryBlock struct {
SizeUncompressed uint64 `json:"size_uncompressed"`
Locations []Location `json:"locations"`
}
// Location is a polymorphic URI holder. The Mithril API ships URI as either
// a plain string (for single artifacts) or as {"Template": "..."} for
// templated per-file URIs (immutables only).
type Location struct {
Type string `json:"type"`
URI URIHolder `json:"uri"`
CompressionAlgorithm string `json:"compression_algorithm,omitempty"`
}
// URIHolder absorbs both string and templated-object URI shapes.
type URIHolder struct {
Plain string
Template string
}
func (h *URIHolder) UnmarshalJSON(b []byte) error {
// Try plain string first
var s string
if err := json.Unmarshal(b, &s); err == nil {
h.Plain = s
return nil
}
// Fall back to {"Template": "..."}
var t struct {
Template string `json:"Template"`
}
if err := json.Unmarshal(b, &t); err == nil {
h.Template = t.Template
return nil
}
return fmt.Errorf("unrecognized URI shape: %s", string(b))
}
// String returns whichever URI form is populated.
func (h URIHolder) String() string {
if h.Template != "" {
return h.Template
}
return h.Plain
}
// Certificate is the server-reported shape for /certificate/{hash}.
// Kept minimal; STM verification reads what it needs from the raw JSON later.
// Kept wide — STM verification consumes raw bytes separately from the decoded view.
type Certificate struct {
Hash string `json:"hash"`
PreviousHash string `json:"previous_hash"`
Epoch uint64 `json:"epoch"`
SignedMessage string `json:"signed_message"`
ProtocolMessage json.RawMessage `json:"protocol_message"`
Multisignature json.RawMessage `json:"multi_signature"`
GenesisSignature string `json:"genesis_signature,omitempty"`
Hash string `json:"hash"`
PreviousHash string `json:"previous_hash"`
Epoch uint64 `json:"epoch"`
SignedMessage string `json:"signed_message"`
ProtocolMessage json.RawMessage `json:"protocol_message"`
Multisignature json.RawMessage `json:"multi_signature"`
GenesisSignature string `json:"genesis_signature,omitempty"`
}
func (c *Client) get(ctx context.Context, path string, out any) error {
@ -84,12 +125,12 @@ func (c *Client) get(ctx context.Context, path string, out any) error {
req.Header.Set("Accept", "application/json")
resp, err := c.http.Do(req)
if err != nil {
return fmt.Errorf("aggregator GET %s: %w", path, err)
return fmt.Errorf("GET %s: %w", path, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("aggregator GET %s: status %d: %s", path, resp.StatusCode, string(body))
return fmt.Errorf("GET %s: %d: %s", path, resp.StatusCode, string(body))
}
if out == nil {
return nil
@ -97,16 +138,11 @@ func (c *Client) get(ctx context.Context, path string, out any) error {
return json.NewDecoder(resp.Body).Decode(out)
}
// ListCardanoDBSnapshots returns the sorted-newest-first list of cardano-database snapshots.
func (c *Client) ListCardanoDBSnapshots(ctx context.Context) ([]CardanoDBSnapshot, error) {
var out []CardanoDBSnapshot
if err := c.get(ctx, "/artifact/cardano-database", &out); err != nil {
return nil, err
}
return out, nil
return out, c.get(ctx, "/artifact/cardano-database", &out)
}
// GetCardanoDBSnapshot fetches details for a single snapshot by hash (or "latest").
func (c *Client) GetCardanoDBSnapshot(ctx context.Context, hash string) (*CardanoDBSnapshot, error) {
var out CardanoDBSnapshot
if err := c.get(ctx, "/artifact/cardano-database/"+url.PathEscape(hash), &out); err != nil {
@ -115,7 +151,6 @@ func (c *Client) GetCardanoDBSnapshot(ctx context.Context, hash string) (*Cardan
return &out, nil
}
// GetCertificate fetches a certificate by hash for signature verification.
func (c *Client) GetCertificate(ctx context.Context, hash string) (*Certificate, error) {
var out Certificate
if err := c.get(ctx, "/certificate/"+url.PathEscape(hash), &out); err != nil {

View file

@ -1,27 +1,136 @@
// Package artifact handles downloading and extracting Mithril snapshot artifacts.
// Currently stubs — HTTP range requests, resumable downloads, zstd+tar extraction
// will be implemented in the next pass.
package artifact
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"time"
)
var ErrNotImplemented = errors.New("not yet implemented")
// Download fetches a URL to destPath, resuming from a .part file if one
// exists. If expectedSHA256 is non-empty, the final file is integrity-checked.
// Progress is reported via the supplied callback (called with current bytes).
//
// Design notes:
// - No parallel chunks yet; a single streaming GET is fine for sub-GB
// artifacts and keeps the first working version simple. Range-chunk
// parallelism will land in v2 once extraction is end-to-end tested.
// - Resume is implemented via the HTTP Range header against the existing
// .part file size; falls back to full download if the server refuses.
// - destPath is atomically replaced only after SHA validation passes.
func Download(ctx context.Context, uri, destPath, expectedSHA256 string, progress func(bytes int64)) error {
if err := os.MkdirAll(filepath.Dir(destPath), 0o755); err != nil {
return fmt.Errorf("mkdir: %w", err)
}
partPath := destPath + ".part"
var existing int64
if fi, err := os.Stat(partPath); err == nil {
existing = fi.Size()
}
// Download fetches an artifact from one of the supplied locations, choosing
// the first reachable one and storing it at destPath.
// Implementation will do:
// - parallel range-chunks over HTTP
// - resume on partial .part file
// - SHA-256 verification against the snapshot manifest
func Download(ctx context.Context, locations []string, destPath string) error {
return ErrNotImplemented
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return err
}
if existing > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", existing))
}
client := &http.Client{Timeout: 0} // artifacts can be GB-scale
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("GET %s: %w", uri, err)
}
defer resp.Body.Close()
var out *os.File
switch resp.StatusCode {
case http.StatusPartialContent:
out, err = os.OpenFile(partPath, os.O_APPEND|os.O_WRONLY, 0o644)
case http.StatusOK:
// Server ignored our range; start over.
existing = 0
out, err = os.Create(partPath)
default:
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
return fmt.Errorf("GET %s: %d: %s", uri, resp.StatusCode, string(body))
}
if err != nil {
return fmt.Errorf("open part: %w", err)
}
defer out.Close()
h := sha256.New()
// If we're resuming, we need to re-hash the existing bytes.
if existing > 0 {
prev, err := os.Open(partPath)
if err == nil {
io.Copy(h, prev)
prev.Close()
}
}
w := io.MultiWriter(out, h)
total := existing
buf := make([]byte, 256*1024)
lastProgress := time.Now()
for {
n, rerr := resp.Body.Read(buf)
if n > 0 {
if _, werr := w.Write(buf[:n]); werr != nil {
return fmt.Errorf("write: %w", werr)
}
total += int64(n)
if progress != nil && time.Since(lastProgress) > 250*time.Millisecond {
progress(total)
lastProgress = time.Now()
}
}
if rerr == io.EOF {
break
}
if rerr != nil {
return fmt.Errorf("read: %w", rerr)
}
}
if progress != nil {
progress(total)
}
if err := out.Close(); err != nil {
return err
}
if expectedSHA256 != "" {
got := hex.EncodeToString(h.Sum(nil))
if got != expectedSHA256 {
return fmt.Errorf("SHA256 mismatch: want %s, got %s", expectedSHA256, got)
}
}
return os.Rename(partPath, destPath)
}
// Extract decompresses a zstd+tar archive into targetDir.
// Will stream through zstd -> tar reader without buffering the full archive.
func Extract(ctx context.Context, archivePath, targetDir string) error {
return ErrNotImplemented
var ErrNoLocations = errors.New("no download locations available")
// DownloadFirst tries each URI in order until one succeeds.
func DownloadFirst(ctx context.Context, uris []string, destPath, expectedSHA256 string, progress func(int64)) error {
if len(uris) == 0 {
return ErrNoLocations
}
var lastErr error
for _, uri := range uris {
if err := Download(ctx, uri, destPath, expectedSHA256, progress); err != nil {
lastErr = err
continue
}
return nil
}
return fmt.Errorf("all locations failed: last error: %w", lastErr)
}

View file

@ -0,0 +1,91 @@
package artifact
import (
"archive/tar"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/klauspost/compress/zstd"
)
// ExtractZstdTar decompresses a .tar.zst archive into targetDir, streaming
// through the reader without buffering the full archive. Refuses entries
// with ".." in the path or absolute paths (tar-slip defense).
func ExtractZstdTar(ctx context.Context, archivePath, targetDir string) error {
f, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("open archive: %w", err)
}
defer f.Close()
zr, err := zstd.NewReader(f)
if err != nil {
return fmt.Errorf("zstd reader: %w", err)
}
defer zr.Close()
tr := tar.NewReader(zr)
if err := os.MkdirAll(targetDir, 0o755); err != nil {
return fmt.Errorf("mkdir target: %w", err)
}
cleanTarget, err := filepath.Abs(targetDir)
if err != nil {
return err
}
for {
if err := ctx.Err(); err != nil {
return err
}
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("tar next: %w", err)
}
// tar-slip defense
clean := filepath.Clean(hdr.Name)
if strings.HasPrefix(clean, "..") || filepath.IsAbs(clean) {
return fmt.Errorf("refusing suspicious archive path: %s", hdr.Name)
}
outPath := filepath.Join(cleanTarget, clean)
if !strings.HasPrefix(filepath.Clean(outPath)+string(os.PathSeparator), cleanTarget+string(os.PathSeparator)) &&
filepath.Clean(outPath) != cleanTarget {
return fmt.Errorf("refusing path outside target: %s", hdr.Name)
}
switch hdr.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(outPath, os.FileMode(hdr.Mode)); err != nil {
return err
}
case tar.TypeReg:
if err := os.MkdirAll(filepath.Dir(outPath), 0o755); err != nil {
return err
}
out, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(hdr.Mode))
if err != nil {
return fmt.Errorf("create %s: %w", outPath, err)
}
if _, err := io.Copy(out, tr); err != nil {
out.Close()
return fmt.Errorf("write %s: %w", outPath, err)
}
if err := out.Close(); err != nil {
return err
}
case tar.TypeSymlink, tar.TypeLink:
// Refuse links for safety — a Mithril archive has no legitimate reason to contain them.
return fmt.Errorf("refusing link entry: %s", hdr.Name)
default:
// Silently skip unknown types.
}
}
return nil
}