Skip to content

Supporting directories in Codex #1202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import ../../logutils
import ../../manifest

# tarballs
import ../../tarballs/[directorymanifest, decoding]

logScope:
topics = "codex discoveryengine advertiser"

Expand Down Expand Up @@ -66,7 +69,11 @@
return

without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
# Try if it not a directory manifest
without manifest =? DirectoryManifest.decode(blk), err:

Check warning on line 73 in codex/blockexchange/engine/advertiser.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/advertiser.nim#L73

Added line #L73 was not covered by tests
error "Unable to decode as manifest", err = err.msg
return
await b.addCidToQueue(cid)

Check warning on line 76 in codex/blockexchange/engine/advertiser.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/advertiser.nim#L75-L76

Added lines #L75 - L76 were not covered by tests
return

# announce manifest cid and tree cid
Expand Down
10 changes: 7 additions & 3 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,18 @@
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)

proc fetchDatasetAsync*(
self: CodexNodeRef, manifest: Manifest, fetchLocal = true
self: CodexNodeRef, manifest: Manifest, fetchLocal = true, onBatch: BatchProc = nil
): Future[void] {.async: (raises: []).} =
## Asynchronously fetch a dataset in the background.
## This task will be tracked and cleaned up on node shutdown.
##
try:
if err =? (
await self.fetchBatched(
manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal
manifest = manifest,
batchSize = DefaultFetchBatch,
fetchLocal = fetchLocal,

Check warning on line 239 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L239

Added line #L239 was not covered by tests
onBatch = onBatch,
)
).errorOption:
error "Unable to fetch blocks", err = err.msg
Expand Down Expand Up @@ -394,6 +397,7 @@
filename: ?string = string.none,
mimetype: ?string = string.none,
blockSize = DefaultBlockSize,
pad = true,
): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
Expand All @@ -403,7 +407,7 @@
let
hcodec = Sha256HashCodec
dataCodec = BlockCodec
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad)

var cids: seq[Cid]

Expand Down
169 changes: 168 additions & 1 deletion codex/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import ../streams/asyncstreamwrapper
import ../stores
import ../utils/options

# tarballs
import ../tarballs/[directorymanifest, directorydownloader, tarballnodeextensions]

import ./coders
import ./json

Expand All @@ -52,6 +55,11 @@ declareCounter(codex_api_downloads, "codex API downloads")
proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} =
0

proc formatDirectoryManifest(
cid: Cid, manifest: DirectoryManifest
): RestDirectoryContent =
return RestDirectoryContent.init(cid, manifest)

proc formatManifest(cid: Cid, manifest: Manifest): RestContent =
return RestContent.init(cid, manifest)

Expand Down Expand Up @@ -149,6 +157,59 @@ proc retrieveCid(
if not stream.isNil:
await stream.close()

proc retrieveDirectory(
node: CodexNodeRef, cid: Cid, resp: HttpResponseRef
): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} =
## Download torrent from the node in a streaming
## manner
##
let directoryDownloader = newDirectoryDownloader(node)

var bytes = 0
try:
without directoryManifest =? (await node.fetchDirectoryManifest(cid)), err:
error "Unable to fetch Directory Metadata", err = err.msg
resp.status = Http404
await resp.sendBody(err.msg)
return

resp.addHeader("Content-Type", "application/octet-stream")

resp.setHeader(
"Content-Disposition", "attachment; filename=\"" & directoryManifest.name & "\""
)

# ToDo: add contentSize to the directory manifest
# let contentLength = codexManifest.datasetSize
# resp.setHeader("Content-Length", $(contentLength.int))

await resp.prepare(HttpResponseStreamType.Plain)

echo "streaming directory: ", cid
directoryDownloader.start(cid)

echo "streaming directory started: ", cid
echo "after sleep..."

while true:
echo "getNext: ", directoryDownloader.queue.len, " entries in queue"
let data = await directoryDownloader.getNext()
echo "getNext[2]: ", data.len, " bytes"
if data.len == 0:
break
bytes += data.len
await resp.sendChunk(addr data[0], data.len)

echo "out of loop: ", directoryDownloader.queue.len, " entries in queue"
await resp.finish()
codex_api_downloads.inc()
except CancelledError as exc:
info "Streaming directory cancelled", exc = exc.msg
raise exc
finally:
info "Sent bytes for directory", cid, bytes
await directoryDownloader.stop()

proc buildCorsHeaders(
httpMethod: string, allowedOrigin: Option[string]
): seq[(string, string)] =
Expand Down Expand Up @@ -179,7 +240,76 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
return filename[0 ..^ 2].some

proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition

router.api(MethodOptions, "/api/codex/v1/tar") do(
resp: HttpResponseRef
) -> RestApiResponse:
if corsOrigin =? allowedOrigin:
resp.setCorsHeaders("POST", corsOrigin)
resp.setHeader(
"Access-Control-Allow-Headers", "content-type, content-disposition"
)

resp.status = Http204
await resp.sendBody("")

router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse:
## Upload a file in a streaming manner
##

trace "Handling upload of a tar file"
var bodyReader = request.getBodyReader()
if bodyReader.isErr():
return RestApiResponse.error(Http500, msg = bodyReader.error())

# Attempt to handle `Expect` header
# some clients (curl), wait 1000ms
# before giving up
#
await request.handleExpect()

var mimetype = request.headers.getString(ContentTypeHeader).some

if mimetype.get() != "":
let mimetypeVal = mimetype.get()
var m = newMimetypes()
let extension = m.getExt(mimetypeVal, "")
if extension == "":
return RestApiResponse.error(
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
)
else:
mimetype = string.none

const ContentDispositionHeader = "Content-Disposition"
let contentDisposition = request.headers.getString(ContentDispositionHeader)
let filename = getFilenameFromContentDisposition(contentDisposition)

if filename.isSome and not isValidFilename(filename.get()):
return RestApiResponse.error(Http422, "The filename is not valid.")

# Here we could check if the extension matches the filename if needed

let reader = bodyReader.get()
let stream = AsyncStreamReader(reader)

try:
without json =? (await node.storeTarball(stream = stream)), error:
error "Error uploading tarball", exc = error.msg
return RestApiResponse.error(Http500, error.msg)

codex_api_uploads.inc()
trace "Uploaded tarball", result = json
return RestApiResponse.response(json, contentType = "application/json")
except CancelledError:
trace "Upload cancelled error"
return RestApiResponse.error(Http500)
except AsyncStreamError:
trace "Async stream error"
return RestApiResponse.error(Http500)
finally:
await reader.closeWait()

router.api(MethodOptions, "/api/codex/v1/data") do(
resp: HttpResponseRef
Expand Down Expand Up @@ -345,6 +475,25 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition")
await node.retrieveCid(cid.get(), local = false, resp = resp)

router.api(MethodGet, "/api/codex/v1/dir/{cid}/network/stream") do(
cid: Cid, resp: HttpResponseRef
) -> RestApiResponse:
## Download a file from the network in a streaming
## manner
##

var headers = buildCorsHeaders("GET", allowedOrigin)

if cid.isErr:
return RestApiResponse.error(Http400, $cid.error(), headers = headers)

if corsOrigin =? allowedOrigin:
resp.setCorsHeaders("GET", corsOrigin)
resp.setHeader("Access-Control-Headers", "X-Requested-With")

resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition")
await node.retrieveDirectory(cid.get(), resp = resp)

router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do(
cid: Cid, resp: HttpResponseRef
) -> RestApiResponse:
Expand All @@ -363,6 +512,24 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
let json = %formatManifest(cid.get(), manifest)
return RestApiResponse.response($json, contentType = "application/json")

router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do(
cid: Cid, resp: HttpResponseRef
) -> RestApiResponse:
## Download only the directory manifest.
##

var headers = buildCorsHeaders("GET", allowedOrigin)

if cid.isErr:
return RestApiResponse.error(Http400, $cid.error(), headers = headers)

without manifest =? (await node.fetchDirectoryManifest(cid.get())), err:
error "Failed to fetch directory manifest", err = err.msg
return RestApiResponse.error(Http404, err.msg, headers = headers)

let json = %formatDirectoryManifest(cid.get(), manifest)
return RestApiResponse.response($json, contentType = "application/json")

router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse:
let json =
%RestRepoStore(
Expand Down
11 changes: 11 additions & 0 deletions codex/rest/json.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import ../utils/json
import ../manifest
import ../units

import ../tarballs/directorymanifest

export json

type
Expand Down Expand Up @@ -47,6 +49,10 @@ type
cid* {.serialize.}: Cid
manifest* {.serialize.}: Manifest

RestDirectoryContent* = object
cid* {.serialize.}: Cid
manifest* {.serialize.}: DirectoryManifest

RestContentList* = object
content* {.serialize.}: seq[RestContent]

Expand Down Expand Up @@ -81,6 +87,11 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList
proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent =
RestContent(cid: cid, manifest: manifest)

proc init*(
_: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest
): RestDirectoryContent =
RestDirectoryContent(cid: cid, manifest: manifest)

proc init*(_: type RestNode, node: dn.Node): RestNode =
RestNode(
nodeId: RestNodeId.init(node.id),
Expand Down
60 changes: 60 additions & 0 deletions codex/tarballs/decoding.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.

{.push raises: [].}

import pkg/libp2p/cid
import pkg/libp2p/multihash
import pkg/libp2p/protobuf/minprotobuf

import pkg/questionable/results

import ../blocktype
import ./directorymanifest

func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest =

Check warning on line 21 in codex/tarballs/decoding.nim

View check run for this annotation

Codecov / codecov/patch

codex/tarballs/decoding.nim#L21

Added line #L21 was not covered by tests
# ```protobuf
# Message DirectoryManifest {
# Message Cid {
# bytes data = 1;
# }
# string name = 1;
# repeated Cid cids = 2;
# ```

var
pbNode = initProtoBuffer(data)
pbInfo: ProtoBuffer
name: string
cids: seq[Cid]
cidsBytes: seq[seq[byte]]

if pbNode.getField(1, name).isErr:
return failure("Unable to decode `name` from DirectoryManifest")

if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure:
for cidEntry in cidsBytes:
var pbCid = initProtoBuffer(cidEntry)
var dataBuf = newSeq[byte]()
if pbCid.getField(1, dataBuf).isErr:
return failure("Unable to decode piece `data` to Cid")
without cid =? Cid.init(dataBuf).mapFailure, err:
return failure(err.msg)
cids.add(cid)

DirectoryManifest(name: name, cids: cids).success

func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest =
## Decode a directory manifest using `decoder`
##

if not ?blk.cid.isManifest:
return failure "Cid is not a Directory Manifest Cid"

DirectoryManifest.decode(blk.data)

Check warning on line 60 in codex/tarballs/decoding.nim

View check run for this annotation

Codecov / codecov/patch

codex/tarballs/decoding.nim#L31-L60

Added lines #L31 - L60 were not covered by tests
Loading
Loading