diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index d094c454a..162ba554b 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -25,6 +25,9 @@ import ../../stores/blockstore import ../../logutils import ../../manifest +# tarballs +import ../../tarballs/[directorymanifest, decoding] + logScope: topics = "codex discoveryengine advertiser" @@ -66,7 +69,11 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]) return without manifest =? Manifest.decode(blk), err: - error "Unable to decode as manifest", err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + await b.addCidToQueue(cid) return # announce manifest cid and tree cid diff --git a/codex/node.nim b/codex/node.nim index fb653c0d7..ca562b9ff 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -226,7 +226,7 @@ proc fetchBatched*( self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) proc fetchDatasetAsync*( - self: CodexNodeRef, manifest: Manifest, fetchLocal = true + self: CodexNodeRef, manifest: Manifest, fetchLocal = true, onBatch: BatchProc = nil ): Future[void] {.async: (raises: []).} = ## Asynchronously fetch a dataset in the background. ## This task will be tracked and cleaned up on node shutdown. @@ -234,7 +234,10 @@ proc fetchDatasetAsync*( try: if err =? ( await self.fetchBatched( - manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal + manifest = manifest, + batchSize = DefaultFetchBatch, + fetchLocal = fetchLocal, + onBatch = onBatch, ) ).errorOption: error "Unable to fetch blocks", err = err.msg @@ -394,6 +397,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + pad = true, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -403,7 +407,7 @@ proc store*( let hcodec = Sha256HashCodec dataCodec = BlockCodec - chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad) var cids: seq[Cid] diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 7c7dcd348..8010b8e6b 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -40,6 +40,9 @@ import ../streams/asyncstreamwrapper import ../stores import ../utils/options +# tarballs +import ../tarballs/[directorymanifest, directorydownloader, tarballnodeextensions] + import ./coders import ./json @@ -52,6 +55,11 @@ declareCounter(codex_api_downloads, "codex API downloads") proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} = 0 +proc formatDirectoryManifest( + cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + return RestDirectoryContent.init(cid, manifest) + proc formatManifest(cid: Cid, manifest: Manifest): RestContent = return RestContent.init(cid, manifest) @@ -149,6 +157,59 @@ proc retrieveCid( if not stream.isNil: await stream.close() +proc retrieveDirectory( + node: CodexNodeRef, cid: Cid, resp: HttpResponseRef +): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = + ## Download torrent from the node in a streaming + ## manner + ## + let directoryDownloader = newDirectoryDownloader(node) + + var bytes = 0 + try: + without directoryManifest =? (await node.fetchDirectoryManifest(cid)), err: + error "Unable to fetch Directory Metadata", err = err.msg + resp.status = Http404 + await resp.sendBody(err.msg) + return + + resp.addHeader("Content-Type", "application/octet-stream") + + resp.setHeader( + "Content-Disposition", "attachment; filename=\"" & directoryManifest.name & "\"" + ) + + # ToDo: add contentSize to the directory manifest + # let contentLength = codexManifest.datasetSize + # resp.setHeader("Content-Length", $(contentLength.int)) + + await resp.prepare(HttpResponseStreamType.Plain) + + echo "streaming directory: ", cid + directoryDownloader.start(cid) + + echo "streaming directory started: ", cid + echo "after sleep..." + + while true: + echo "getNext: ", directoryDownloader.queue.len, " entries in queue" + let data = await directoryDownloader.getNext() + echo "getNext[2]: ", data.len, " bytes" + if data.len == 0: + break + bytes += data.len + await resp.sendChunk(addr data[0], data.len) + + echo "out of loop: ", directoryDownloader.queue.len, " entries in queue" + await resp.finish() + codex_api_downloads.inc() + except CancelledError as exc: + info "Streaming directory cancelled", exc = exc.msg + raise exc + finally: + info "Sent bytes for directory", cid, bytes + await directoryDownloader.stop() + proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] ): seq[(string, string)] = @@ -179,7 +240,76 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string = return filename[0 ..^ 2].some proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) = - let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion + let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition + + router.api(MethodOptions, "/api/codex/v1/tar") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse: + ## Upload a file in a streaming manner + ## + + trace "Handling upload of a tar file" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500, msg = bodyReader.error()) + + # Attempt to handle `Expect` header + # some clients (curl), wait 1000ms + # before giving up + # + await request.handleExpect() + + var mimetype = request.headers.getString(ContentTypeHeader).some + + if mimetype.get() != "": + let mimetypeVal = mimetype.get() + var m = newMimetypes() + let extension = m.getExt(mimetypeVal, "") + if extension == "": + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) + else: + mimetype = string.none + + const ContentDispositionHeader = "Content-Disposition" + let contentDisposition = request.headers.getString(ContentDispositionHeader) + let filename = getFilenameFromContentDisposition(contentDisposition) + + if filename.isSome and not isValidFilename(filename.get()): + return RestApiResponse.error(Http422, "The filename is not valid.") + + # Here we could check if the extension matches the filename if needed + + let reader = bodyReader.get() + let stream = AsyncStreamReader(reader) + + try: + without json =? (await node.storeTarball(stream = stream)), error: + error "Error uploading tarball", exc = error.msg + return RestApiResponse.error(Http500, error.msg) + + codex_api_uploads.inc() + trace "Uploaded tarball", result = json + return RestApiResponse.response(json, contentType = "application/json") + except CancelledError: + trace "Upload cancelled error" + return RestApiResponse.error(Http500) + except AsyncStreamError: + trace "Async stream error" + return RestApiResponse.error(Http500) + finally: + await reader.closeWait() router.api(MethodOptions, "/api/codex/v1/data") do( resp: HttpResponseRef @@ -345,6 +475,25 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") await node.retrieveCid(cid.get(), local = false, resp = resp) + router.api(MethodGet, "/api/codex/v1/dir/{cid}/network/stream") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download a file from the network in a streaming + ## manner + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("GET", corsOrigin) + resp.setHeader("Access-Control-Headers", "X-Requested-With") + + resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") + await node.retrieveDirectory(cid.get(), resp = resp) + router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: @@ -363,6 +512,24 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute let json = %formatManifest(cid.get(), manifest) return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download only the directory manifest. + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + without manifest =? (await node.fetchDirectoryManifest(cid.get())), err: + error "Failed to fetch directory manifest", err = err.msg + return RestApiResponse.error(Http404, err.msg, headers = headers) + + let json = %formatDirectoryManifest(cid.get(), manifest) + return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse: let json = %RestRepoStore( diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 1b9459c12..ec64c311b 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -9,6 +9,8 @@ import ../utils/json import ../manifest import ../units +import ../tarballs/directorymanifest + export json type @@ -47,6 +49,10 @@ type cid* {.serialize.}: Cid manifest* {.serialize.}: Manifest + RestDirectoryContent* = object + cid* {.serialize.}: Cid + manifest* {.serialize.}: DirectoryManifest + RestContentList* = object content* {.serialize.}: seq[RestContent] @@ -81,6 +87,11 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent = RestContent(cid: cid, manifest: manifest) +proc init*( + _: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + RestDirectoryContent(cid: cid, manifest: manifest) + proc init*(_: type RestNode, node: dn.Node): RestNode = RestNode( nodeId: RestNodeId.init(node.id), diff --git a/codex/tarballs/decoding.nim b/codex/tarballs/decoding.nim new file mode 100644 index 000000000..d1963d27e --- /dev/null +++ b/codex/tarballs/decoding.nim @@ -0,0 +1,60 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/multihash +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ../blocktype +import ./directorymanifest + +func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var + pbNode = initProtoBuffer(data) + pbInfo: ProtoBuffer + name: string + cids: seq[Cid] + cidsBytes: seq[seq[byte]] + + if pbNode.getField(1, name).isErr: + return failure("Unable to decode `name` from DirectoryManifest") + + if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure: + for cidEntry in cidsBytes: + var pbCid = initProtoBuffer(cidEntry) + var dataBuf = newSeq[byte]() + if pbCid.getField(1, dataBuf).isErr: + return failure("Unable to decode piece `data` to Cid") + without cid =? Cid.init(dataBuf).mapFailure, err: + return failure(err.msg) + cids.add(cid) + + DirectoryManifest(name: name, cids: cids).success + +func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest = + ## Decode a directory manifest using `decoder` + ## + + if not ?blk.cid.isManifest: + return failure "Cid is not a Directory Manifest Cid" + + DirectoryManifest.decode(blk.data) diff --git a/codex/tarballs/directorydownloader.nim b/codex/tarballs/directorydownloader.nim new file mode 100644 index 000000000..63e3fb303 --- /dev/null +++ b/codex/tarballs/directorydownloader.nim @@ -0,0 +1,256 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/os +import std/times +import std/strutils +import std/sequtils +import std/sugar +import pkg/chronos +import pkg/libp2p/[cid, multihash] +import pkg/libp2p/stream/lpstream +import pkg/questionable/results +import pkg/stew/byteutils + +import ../node +import ../logutils +import ../utils/iter +import ../utils/trackedfutures +import ../errors +import ../manifest +import ../blocktype +import ../stores/blockstore + +import ./tarballs +import ./directorymanifest +import ./decoding + +logScope: + topics = "codex node directorydownloader" + +type DirectoryDownloader* = ref object + node: CodexNodeRef + queue*: AsyncQueue[seq[byte]] + finished: bool + trackedFutures: TrackedFutures + +proc printQueue(self: DirectoryDownloader) = + echo "Queue: ", self.queue.len, " entries" + for i in 0 ..< self.queue.len: + echo "Entry ", i, ": ", self.queue[i].len, " bytes" + +proc createEntryHeader( + self: DirectoryDownloader, entry: TarballEntry, basePath: string +): string = + echo "Creating entry header for ", entry.name + echo "basePath = ", basePath + echo "entry = ", entry + result = newStringOfCap(512) + result.add(entry.name) + result.setLen(100) + # ToDo: read permissions from the TarballEntry + if entry.kind == ekDirectory: + result.add("000755 \0") # Dir mode + else: + result.add("000644 \0") # File mode + result.add(toOct(0, 6) & " \0") # Owner's numeric user ID + result.add(toOct(0, 6) & " \0") # Group's numeric user ID + result.add(toOct(entry.contentLength, 11) & ' ') # File size + result.add(toOct(entry.lastModified.toUnix(), 11) & ' ') # Last modified time + result.add(" ") # Empty checksum for now + result.setLen(156) + result.add(ord(entry.kind).char) + result.setLen(257) + result.add("ustar\0") # UStar indicator + result.add(toOct(0, 2)) # UStar version + result.setLen(329) + result.add(toOct(0, 6) & "\0 ") # Device major number + result.add(toOct(0, 6) & "\0 ") # Device minor number + result.add(basePath) + result.setLen(512) + + var checksum: int + for i in 0 ..< result.len: + checksum += result[i].int + + let checksumStr = toOct(checksum, 6) & '\0' + for i in 0 ..< checksumStr.len: + result[148 + i] = checksumStr[i] + +proc fetchTarball( + self: DirectoryDownloader, cid: Cid, basePath = "" +): Future[?!void] {.async: (raises: [CancelledError]).} = + # we only need try/catch here because PR for checked exceptions is + # not yet merged + try: + echo "fetchTarball: ", cid, " basePath = ", basePath + # we got a Cid - let's check if this is a manifest (can be either + # a directory or file manifest) + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is a manifest" + return failure("Unable to determine if cid is a manifest") + + if not isM: + # this is not a manifest, so we can return + return failure("given cid is not a manifest: " & $cid) + + # get the manifest + without blk =? await self.node.blockStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return failure( + "Error retrieving manifest block (cid = " & $cid & "), err = " & err.msg + ) + + without manifest =? Manifest.decode(blk), err: + info "Unable to decode as manifest - trying to decode as directory manifest", + err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as valid manifest (cid = " & $cid & ")") + # this is a directory manifest + echo "Decoded directory manifest: ", $manifest + let dirEntry = TarballEntry( + kind: ekDirectory, + name: manifest.name, + lastModified: getTime(), # ToDo: store actual time in the manifest + permissions: parseFilePermissions(cast[uint32](0o755)), # same here + contentLength: 0, + ) + let header = self.createEntryHeader(dirEntry, basePath) + echo "header = ", header + await self.queue.addLast(header.toBytes()) + self.printQueue() + var entryLength = header.len + let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned + if alignedEntryLength - entryLength > 0: + echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" + var data = newSeq[byte]() + data.setLen(alignedEntryLength - entryLength) + await self.queue.addLast(data) + self.printQueue() + + for cid in manifest.cids: + echo "fetching directory content: ", cid + if err =? (await self.fetchTarball(cid, basePath / manifest.name)).errorOption: + error "Error fetching directory content", + cid, path = basePath / manifest.name, err = err.msg + return failure( + "Error fetching directory content (cid = " & $cid & "), err = " & err.msg + ) + echo "fetchTarball[DIR]: ", cid, " basePath = ", basePath, " done" + return success() + + # this is a regular file (Codex) manifest + echo "Decoded file manifest: ", $manifest + let fileEntry = TarballEntry( + kind: ekNormalFile, + name: manifest.filename |? "unknown", + lastModified: getTime(), # ToDo: store actual time in the manifest + permissions: parseFilePermissions(cast[uint32](0o644)), # same here + contentLength: manifest.datasetSize.int, + ) + let header = self.createEntryHeader(fileEntry, basePath) + await self.queue.addLast(header.toBytes()) + self.printQueue() + var contentLength = 0 + + proc onBatch(blocks: seq[Block]): Future[?!void] {.async.} = + echo "onBatch: ", blocks.len, " blocks" + for blk in blocks: + echo "onBatch[blk.data]: ", string.fromBytes(blk.data) + # await self.queue.addLast(string.fromBytes(blk.data)) + await self.queue.addLast(blk.data) + self.printQueue() + contentLength += blk.data.len + # this can happen if the content was stored with padding + if contentLength > manifest.datasetSize.int: + contentLength = manifest.datasetSize.int + echo "onBatch[contentLength]: ", contentLength + success() + + await self.node.fetchDatasetAsync(manifest, fetchLocal = true, onBatch = onBatch) + + echo "contentLength: ", contentLength + echo "manifest.datasetSize.int: ", manifest.datasetSize.int + if contentLength != manifest.datasetSize.int: + echo "Warning: entry length mismatch, expected ", + manifest.datasetSize.int, " got ", contentLength + + let entryLength = header.len + contentLength + let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned + if alignedEntryLength - entryLength > 0: + echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" + var data = newSeq[byte]() + echo "alignedEntryLength: ", alignedEntryLength + echo "entryLength: ", entryLength + echo "alignedEntryLength - entryLength: ", alignedEntryLength - entryLength + data.setLen(alignedEntryLength - entryLength) + echo "data.len: ", data.len + await self.queue.addLast(data) + self.printQueue() + echo "fetchTarball: ", cid, " basePath = ", basePath, " done" + return success() + except CancelledError as e: + raise e + except CatchableError as e: + error "Error fetching tarball", cid, err = e.msg + return failure("Error fetching tarball (cid = " & $cid & "), err = " & e.msg) + +proc streamDirectory( + self: DirectoryDownloader, cid: Cid +): Future[void] {.async: (raises: []).} = + try: + if err =? (await self.fetchTarball(cid, basePath = "")).errorOption: + error "Error fetching directory content", cid, err = err.msg + return + # Two consecutive zero-filled records at end + var data = newSeq[byte]() + data.setLen(1024) + await self.queue.addLast(data) + self.printQueue() + # mark the end of the stream + self.finished = true + echo "streamDirectory: ", cid, " done" + except CancelledError: + info "Streaming directory cancelled:", cid + +########################################################################### +# Public API +########################################################################### + +proc start*(self: DirectoryDownloader, cid: Cid) = + ## Starts streaming the directory content + self.trackedFutures.track(self.streamDirectory(cid)) + +proc stop*(self: DirectoryDownloader) {.async: (raises: []).} = + await noCancel self.trackedFutures.cancelTracked() + +proc getNext*( + self: DirectoryDownloader +): Future[seq[byte]] {.async: (raises: [CancelledError]).} = + ## Returns the next entry from the queue + echo "getNext: ", self.queue.len, " entries in queue" + if (self.queue.len == 0 and self.finished): + return @[] + echo "getNext[2]: ", self.queue.len, " entries in queue" + let chunk = await self.queue.popFirst() + echo "getNext: ", chunk.len, " bytes" + return chunk + +proc newDirectoryDownloader*(node: CodexNodeRef): DirectoryDownloader = + ## Creates a new DirectoryDownloader instance + DirectoryDownloader( + node: node, + queue: newAsyncQueue[seq[byte]](), + finished: false, + trackedFutures: TrackedFutures(), + ) diff --git a/codex/tarballs/directorymanifest.nim b/codex/tarballs/directorymanifest.nim new file mode 100644 index 000000000..1e83158b7 --- /dev/null +++ b/codex/tarballs/directorymanifest.nim @@ -0,0 +1,26 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import ../utils/json + +type DirectoryManifest* = ref object + name* {.serialize.}: string + cids* {.serialize.}: seq[Cid] + +proc `$`*(self: DirectoryManifest): string = + "DirectoryManifest(name: " & self.name & ", cids: " & $self.cids & ")" + +func `==`*(a: DirectoryManifest, b: DirectoryManifest): bool = + a.name == b.name and a.cids == b.cids + +proc newDirectoryManifest*(name: string, cids: seq[Cid]): DirectoryManifest = + DirectoryManifest(name: name, cids: cids) diff --git a/codex/tarballs/encoding.nim b/codex/tarballs/encoding.nim new file mode 100644 index 000000000..8e25853cb --- /dev/null +++ b/codex/tarballs/encoding.nim @@ -0,0 +1,39 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/protobuf/minprotobuf + +import ./directorymanifest + +proc write(pb: var ProtoBuffer, field: int, value: Cid) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: DirectoryManifest): seq[byte] = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.name) + for cid in manifest.cids: + ipb.write(2, cid) + ipb.finish() + ipb.buffer diff --git a/codex/tarballs/stdstreamwrapper.nim b/codex/tarballs/stdstreamwrapper.nim new file mode 100644 index 000000000..10e3aeb07 --- /dev/null +++ b/codex/tarballs/stdstreamwrapper.nim @@ -0,0 +1,72 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/streams +import pkg/libp2p +import pkg/chronos + +import ../logutils + +logScope: + topics = "libp2p stdstreamwrapper" + +const StdStreamWrapperName* = "StdStreamWrapper" + +type StdStreamWrapper* = ref object of LPStream + stream*: Stream + +method initStream*(self: StdStreamWrapper) = + if self.objName.len == 0: + self.objName = StdStreamWrapperName + + procCall LPStream(self).initStream() + +proc newStdStreamWrapper*(stream: Stream = nil): StdStreamWrapper = + let stream = StdStreamWrapper(stream: stream) + + stream.initStream() + return stream + +template withExceptions(body: untyped) = + try: + body + except CatchableError as exc: + raise newException(Defect, "Unexpected error in StdStreamWrapper", exc) + +method readOnce*( + self: StdStreamWrapper, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + trace "Reading bytes from stream", bytes = nbytes + if isNil(self.stream): + error "StdStreamWrapper: stream is nil" + raiseAssert("StdStreamWrapper: stream is nil") + + if self.atEof: + raise newLPStreamEOFError() + + withExceptions: + return self.stream.readData(pbytes, nbytes) + +method atEof*(self: StdStreamWrapper): bool = + withExceptions: + return self.stream.atEnd() + +method closeImpl*(self: StdStreamWrapper) {.async: (raises: []).} = + try: + trace "Shutting down std stream" + + self.stream.close() + + trace "Shutdown async chronos stream" + except CatchableError as exc: + trace "Error closing std stream", msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/codex/tarballs/tarballnodeextensions.nim b/codex/tarballs/tarballnodeextensions.nim new file mode 100644 index 000000000..fb4c4b9ea --- /dev/null +++ b/codex/tarballs/tarballnodeextensions.nim @@ -0,0 +1,126 @@ +import std/streams + +import pkg/chronos +import pkg/libp2p/cid +import pkg/questionable/results + +import ../node +import ../blocktype +import ../manifest +import ../stores/blockstore + +import ./tarballs +import ./stdstreamwrapper +import ./directorymanifest +import ./encoding +import ./decoding + +proc fetchDirectoryManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!DirectoryManifest] {.async: (raises: [CancelledError]).} = + ## Fetch and decode a manifest block + ## + + # we only need try/catch here because PR for checked exceptions is + # not yet merged + try: + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" + + trace "Retrieving directory manifest for cid", cid + + without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieving directory manifest block", cid, err = err.msg + return failure err + + trace "Decoding directory manifest for cid", cid + + without manifest =? DirectoryManifest.decode(blk), err: + trace "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as directory manifest") + + trace "Decoded directory manifest", cid + + return manifest.success + except CancelledError as e: + raise e + except CatchableError as e: + trace "Error fetching directory manifest", cid, err = e.msg + return failure(e.msg) + +proc storeDirectoryManifest*( + self: CodexNodeRef, manifest: DirectoryManifest +): Future[?!Block] {.async.} = + let encodedManifest = manifest.encode() + + without blk =? Block.new(data = encodedManifest, codec = ManifestCodec), error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.blockStore.putBlock(blk)).errorOption: + trace "Unable to store manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + +proc storeTarball*( + self: CodexNodeRef, stream: AsyncStreamReader +): Future[?!string] {.async.} = + info "Storing tarball data" + + # Just as a proof of concept, we process tar bar in memory + # Later to see how to do actual streaming to either store + # tarball locally in some tmp folder, or to process the + # tarball incrementally + let tarballBytes = await stream.read() + let stream = newStringStream(string.fromBytes(tarballBytes)) + + proc onProcessedTarFile( + stream: Stream, fileName: string + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarFile:name: ", fileName + let stream = newStdStreamWrapper(stream) + await self.store(stream, filename = some fileName, pad = false) + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar file", fileName, exc = e.msg + return failure(e.msg) + + proc onProcessedTarDir( + name: string, cids: seq[Cid] + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarDir:name: ", name + echo "onProcessedTarDir:cids: ", cids + let directoryManifest = newDirectoryManifest(name = name, cids = cids) + without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err: + error "Unable to store manifest" + return failure(err) + manifestBlk.cid.success + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar dir", name, exc = e.msg + return failure(e.msg) + + let tarball = Tarball() + if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption: + error "Unable to open tarball", err = err.msg + return failure(err) + echo "tarball = ", $tarball + without root =? tarball.findRootDir(), err: + return failure(err.msg) + echo "root = ", root + let dirs = processDirEntries(tarball) + echo "dirs = ", dirs + without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err: + error "Unable to build tree", err = err.msg + return failure(err) + echo "" + echo "preorderTraversal:" + let json = newJArray() + preorderTraversal(tree, json) + echo "json = ", json + success($json) diff --git a/codex/tarballs/tarballs.nim b/codex/tarballs/tarballs.nim new file mode 100644 index 000000000..b0ba59e9d --- /dev/null +++ b/codex/tarballs/tarballs.nim @@ -0,0 +1,282 @@ +{.push raises: [].} + +import std/os +import std/times +import std/strutils +import std/strformat +import std/sequtils +import std/streams +import std/tables + +import std/random + +import pkg/chronos +import pkg/questionable/results +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/serde/json + +import ../blocktype +import ../manifest +import ./directorymanifest + +proc example2*(_: type Block, size: int = 4096): ?!Block = + let length = rand(size) + let bytes = newSeqWith(length, rand(uint8)) + Block.new(bytes) + +proc example2*(_: type Cid): ?!Cid = + Block.example2 .? cid + +const + TUREAD* = 0o00400'u32 # read by owner */ + TUWRITE* = 0o00200'u32 # write by owner */ + TUEXEC* = 0o00100'u32 # execute/search by owner */ + TGREAD* = 0o00040'u32 # read by group */ + TGWRITE* = 0o00020'u32 # write by group */ + TGEXEC* = 0o00010'u32 # execute/search by group */ + TOREAD* = 0o00004'u32 # read by other */ + TOWRITE* = 0o00002'u32 # write by other */ + TOEXEC* = 0o00001'u32 # execute/search by other */ + +type + EntryKind* = enum + ekNormalFile = '0' + ekDirectory = '5' + + TarballEntry* = object + kind*: EntryKind + name*: string + cid*: Cid + contentLength*: int + lastModified*: times.Time + permissions*: set[FilePermission] + + Tarball* = ref object + contents*: OrderedTable[string, TarballEntry] + + TarballError* = object of ValueError + + TarballTree* = ref object + name*: string + cid*: Cid + children*: seq[TarballTree] + + # ToDo: make sure we also record files permissions, modification time, etc... + # For now, only fileName so that we do not have to change the Codex manifest + # right away + OnProcessedTarFile* = proc(stream: Stream, fileName: string): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + + OnProcessedTarDir* = proc(name: string, cids: seq[Cid]): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + +proc `$`*(tarball: Tarball): string = + result = "Tarball with " & $tarball.contents.len & " entries" + for name, entry in tarball.contents.pairs: + var lastModified: string = "(unknown)" + try: + let lastModified = $entry.lastModified + except TimeFormatParseError: + discard + result.add( + "\n " & + fmt"{name}: name = {entry.name}, {entry.kind} ({entry.contentLength} bytes) @ {lastModified} [{entry.cid}]" + ) + +proc `$`*(tarballEntry: TarballEntry): string = + ## Returns a string representation of the tarball entry. + result = fmt"({tarballEntry.kind}, {tarballEntry.name})" + +proc parseFilePermissions*(permissions: uint32): set[FilePermission] = + if defined(windows) or permissions == 0: + # Ignore file permissions on Windows. If they are absent (.zip made on + # Windows for example), set default permissions. + result.incl fpUserRead + result.incl fpUserWrite + result.incl fpGroupRead + result.incl fpOthersRead + else: + if (permissions and TUREAD) != 0: + result.incl(fpUserRead) + if (permissions and TUWRITE) != 0: + result.incl(fpUserWrite) + if (permissions and TUEXEC) != 0: + result.incl(fpUserExec) + if (permissions and TGREAD) != 0: + result.incl(fpGroupRead) + if (permissions and TGWRITE) != 0: + result.incl(fpGroupWrite) + if (permissions and TGEXEC) != 0: + result.incl(fpGroupExec) + if (permissions and TOREAD) != 0: + result.incl(fpOthersRead) + if (permissions and TOWRITE) != 0: + result.incl(fpOthersWrite) + if (permissions and TOEXEC) != 0: + result.incl(fpOthersExec) + +proc toUnixPath(path: string): string = + path.replace('\\', '/') + +proc clear*(tarball: Tarball) = + tarball.contents.clear() + +proc openStreamImpl( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raises: []).} = + tarball.clear() + + proc trim(s: string): string = + for i in 0 ..< s.len: + if s[i] == '\0': + return s[0 ..< i] + s + + try: + var data = stream.readAll() # TODO: actually treat as a stream + + var pos: int + while pos < data.len: + if pos + 512 > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let + header = data[pos ..< pos + 512] + fileName = header[0 ..< 100].trim() + + pos += 512 + + if fileName.len == 0: + continue + + let + fileSize = + try: + parseOctInt(header[124 .. 134]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + lastModified = + try: + parseOctInt(header[136 .. 146]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + typeFlag = header[156] + fileMode = + try: + parseOctInt(header[100 ..< 106]) + except ValueError: + raise newException( + TarballError, "Unexpected error while opening tarball (mode)" + ) + fileNamePrefix = + if header[257 ..< 263] == "ustar\0": + header[345 ..< 500].trim() + else: + "" + + if pos + fileSize > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let normalizedFileName = normalizePathEnd(fileName) + + if typeFlag == '0' or typeFlag == '\0': + if not onProcessedTarFile.isNil: + let stream = newStringStream(data[pos ..< pos + fileSize]) + without cid =? + await onProcessedTarFile(stream, normalizedFileName.lastPathPart), err: + return failure(err.msg) + tarball.contents[(fileNamePrefix / fileName).toUnixPath()] = TarballEntry( + kind: ekNormalFile, + name: normalizedFileName, + contentLength: fileSize, + cid: cid, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + elif typeFlag == '5': + tarball.contents[normalizePathEnd((fileNamePrefix / fileName).toUnixPath())] = TarballEntry( + kind: ekDirectory, + name: normalizedFileName, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + + # Move pos by fileSize, where fileSize is 512 byte aligned + pos += (fileSize + 511) and not 511 + success() + except CatchableError as e: + return failure(e.msg) + +proc open*( + tarball: Tarball, bytes: string, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + let stream = newStringStream(bytes) + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc open*( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc processDirEntries*(tarball: Tarball): Table[string, seq[TarballEntry]] = + result = initTable[string, seq[TarballEntry]]() + for name, entry in tarball.contents.pairs: + let path = normalizePathEnd(name) + if not isRootDir(path): + let (head, _) = splitPath(path) + result.withValue(head, value): + value[].add(entry) + do: + result[head] = @[entry] + +proc findRootDir*(tarball: Tarball): ?!string = + var rootDir = "" + for entry in tarball.contents.values: + if entry.kind == ekDirectory: + if isRootDir(entry.name): + return success(entry.name) + failure("No root directory found in tarball") + +proc buildTree*( + root: string, + dirs: Table[string, seq[TarballEntry]], + onProcessedTarDir: OnProcessedTarDir = nil, +): Future[?!TarballTree] {.async: (raises: [CancelledError]).} = + let tree = TarballTree(name: root.lastPathPart, children: @[]) + let entries = dirs.getOrDefault(root) + for entry in entries: + if entry.kind == ekDirectory: + without subTree =? + await buildTree(root = entry.name, dirs = dirs, onProcessedTarDir), err: + return failure(err.msg) + # compute Cid for the subtree + # let cids = subTree.children.mapIt(it.cid) + # if not onProcessedTarDir.isNil: + # without cid =? await onProcessedTarDir(subTree.name, cids), err: + # return failure(err.msg) + # subTree.cid = cid + tree.children.add(subTree) + else: + let child = + TarballTree(name: entry.name.lastPathPart, children: @[], cid: entry.cid) + tree.children.add(child) + let cids = tree.children.mapIt(it.cid) + if not onProcessedTarDir.isNil: + without cid =? await onProcessedTarDir(tree.name, cids), err: + return failure(err.msg) + tree.cid = cid + success(tree) + +proc preorderTraversal*(root: TarballTree, json: JsonNode) = + echo root.name + let jsonObj = newJObject() + jsonObj["name"] = newJString(root.name) + jsonObj["cid"] = newJString($root.cid) + json.add(jsonObj) + if root.children.len > 0: + let jsonArray = newJArray() + jsonObj["children"] = jsonArray + for child in root.children: + preorderTraversal(child, jsonArray) diff --git a/tests/fixtures/tarballs/dir/dir1/file11.txt b/tests/fixtures/tarballs/dir/dir1/file11.txt new file mode 100644 index 000000000..f6854cfe7 --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file11.txt @@ -0,0 +1 @@ +File 11 diff --git a/tests/fixtures/tarballs/dir/dir1/file12.txt b/tests/fixtures/tarballs/dir/dir1/file12.txt new file mode 100644 index 000000000..e64f6a28a --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file12.txt @@ -0,0 +1 @@ +File 12 diff --git a/tests/fixtures/tarballs/dir/file1.txt b/tests/fixtures/tarballs/dir/file1.txt new file mode 100644 index 000000000..50fcd26d6 --- /dev/null +++ b/tests/fixtures/tarballs/dir/file1.txt @@ -0,0 +1 @@ +File 1 diff --git a/tests/fixtures/tarballs/dir/file2.txt b/tests/fixtures/tarballs/dir/file2.txt new file mode 100644 index 000000000..4475433e2 --- /dev/null +++ b/tests/fixtures/tarballs/dir/file2.txt @@ -0,0 +1 @@ +File 2 diff --git a/tests/fixtures/tarballs/testtarbar.tar b/tests/fixtures/tarballs/testtarbar.tar new file mode 100644 index 000000000..3fe060318 Binary files /dev/null and b/tests/fixtures/tarballs/testtarbar.tar differ