diff --git a/.github/scripts/fixtures.sh b/.github/scripts/fixtures.sh index 428b29d1..0aa61e6b 100755 --- a/.github/scripts/fixtures.sh +++ b/.github/scripts/fixtures.sh @@ -3,5 +3,13 @@ set -e hadoop fs -mkdir -p "/_test" hadoop fs -chmod 777 "/_test" +if [ "$TRANSPARENT_ENCRYPTION" = "true" ]; then + echo "Prepare encrypted zone" + hadoop fs -mkdir /_test/kms + hadoop fs -chmod 777 "/_test/kms" + hadoop key create key1 + hdfs crypto -createZone -keyName key1 -path /_test/kms +fi + hadoop fs -put ./testdata/foo.txt "/_test/foo.txt" hadoop fs -Ddfs.block.size=1048576 -put ./testdata/mobydick.txt "/_test/mobydick.txt" diff --git a/.github/scripts/install-hdfs.sh b/.github/scripts/install-hdfs.sh index 77d8803e..9b3af94d 100755 --- a/.github/scripts/install-hdfs.sh +++ b/.github/scripts/install-hdfs.sh @@ -2,7 +2,7 @@ set -e -KERBEROS=${KERBEROS-"false"} +KERBEROS="${KERBEROS-false}" AES=${AES-"false"} if [ "$DATA_TRANSFER_PROTECTION" = "privacy" ]; then KERBEROS="true" @@ -15,11 +15,18 @@ else ENCRYPT_DATA_TRANSFER="false" fi +CONF_KMS_PROVIDER="" +TRANSPARENT_ENCRYPTION=false +if [ "$HADOOP_VERSION" != "2.10.1" ]; then + TRANSPARENT_ENCRYPTION=true + CONF_KMS_PROVIDER="kms://http@localhost:9600/kms" +fi + CONF_AUTHENTICATION="simple" KERBEROS_REALM="EXAMPLE.COM" KERBEROS_PRINCIPLE="administrator" KERBEROS_PASSWORD="password1234" -if [ $KERBEROS = "true" ]; then +if [ "$KERBEROS" = "true" ]; then CONF_AUTHENTICATION="kerberos" HOSTNAME=$(hostname) @@ -50,13 +57,16 @@ EOF sudo apt-get install -y krb5-user krb5-kdc krb5-admin-server printf "$KERBEROS_PASSWORD\n$KERBEROS_PASSWORD" | sudo kdb5_util -r "$KERBEROS_REALM" create -s - for p in nn dn $USER gohdfs1 gohdfs2; do + for p in nn dn kms $USER gohdfs1 gohdfs2; do sudo kadmin.local -q "addprinc -randkey $p/$HOSTNAME@$KERBEROS_REALM" sudo kadmin.local -q "addprinc -randkey $p/localhost@$KERBEROS_REALM" sudo kadmin.local -q "xst -k /tmp/$p.keytab $p/$HOSTNAME@$KERBEROS_REALM" sudo kadmin.local -q "xst -k /tmp/$p.keytab $p/localhost@$KERBEROS_REALM" sudo chmod +rx /tmp/$p.keytab done + # HTTP service for KMS + sudo kadmin.local -q "addprinc -randkey HTTP/localhost@$KERBEROS_REALM" + sudo kadmin.local -q "xst -k /tmp/kms.keytab HTTP/localhost@$KERBEROS_REALM" echo "Restarting krb services..." sudo service krb5-kdc restart @@ -116,6 +126,10 @@ sudo tee $HADOOP_ROOT/etc/hadoop/core-site.xml <hadoop.rpc.protection $RPC_PROTECTION + + hadoop.security.key.provider.path + $CONF_KMS_PROVIDER + EOF @@ -125,6 +139,10 @@ sudo tee $HADOOP_ROOT/etc/hadoop/hdfs-site.xml <dfs.namenode.name.dir /tmp/hdfs/name + + dfs.namenode.fs-limits.min-block-size + 131072 + dfs.datanode.data.dir /tmp/hdfs/data @@ -172,6 +190,41 @@ $HADOOP_ROOT/bin/hdfs namenode -format sudo groupadd hadoop sudo usermod -a -G hadoop $USER +sudo tee $HADOOP_ROOT/etc/hadoop/kms-site.xml < + + hadoop.kms.key.provider.uri + jceks://file@/tmp/hdfs/kms.keystore + + + hadoop.security.keystore.java-keystore-provider.password-file + kms.keystore.password + + + hadoop.kms.authentication.type + $CONF_AUTHENTICATION + + + hadoop.kms.authentication.kerberos.keytab + /tmp/kms.keytab + + + hadoop.kms.authentication.kerberos.principal + HTTP/localhost@$KERBEROS_REALM + + +EOF + +sudo tee $HADOOP_ROOT/etc/hadoop/kms.keystore.password < /tmp/hdfs/kms.log 2>&1 & +fi + echo "Starting namenode..." $HADOOP_ROOT/bin/hdfs namenode > /tmp/hdfs/namenode.log 2>&1 & @@ -184,4 +237,5 @@ echo "Waiting for cluster to exit safe mode..." $HADOOP_ROOT/bin/hdfs dfsadmin -safemode wait echo "HADOOP_CONF_DIR=$(pwd)/$HADOOP_ROOT/etc/hadoop" >> $GITHUB_ENV -echo "$(pwd)/$HADOOP_ROOT/bin" >> $GITHUB_PATH \ No newline at end of file +echo "TRANSPARENT_ENCRYPTION=$TRANSPARENT_ENCRYPTION" >> $GITHUB_ENV +echo "$(pwd)/$HADOOP_ROOT/bin" >> $GITHUB_PATH diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7ff0ed63..f0755291 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,8 +38,8 @@ jobs: go-version: 1.17 # This step installs downloads hadoop and starts a local cluster with one - # namenode and one datanode. It adds the hadoop binaries to GITHUB_PATH - # and HADOOP_CONF_DIR to GITHUB_ENV. + # namenode and one datanode. It adds the hadoop binaries to GITHUB_PATH, + # TRANSPARENT_ENCRYPTION and HADOOP_CONF_DIR to GITHUB_ENV. - name: install-hdfs.sh run: ./.github/scripts/install-hdfs.sh env: @@ -65,6 +65,16 @@ jobs: run: | make test + - name: cat kms.log + if: always() + run: | + if [ -f /tmp/hdfs/kms.log ] + then + cat /tmp/hdfs/kms.log + else + echo "not exists" + fi + - name: cat namenode.log if: always() run: cat /tmp/hdfs/namenode.log diff --git a/aes.go b/aes.go new file mode 100644 index 00000000..5cc7bbb3 --- /dev/null +++ b/aes.go @@ -0,0 +1,58 @@ +package hdfs + +import ( + "crypto/aes" + "crypto/cipher" + "encoding/binary" + "fmt" +) + +// calculateIV `shifts` IV to given offset +// based on calculateIV from AesCtrCryptoCodec.java +func calculateIV(offset int64, initIV []byte) ([]byte, error) { + if len(initIV) != aes.BlockSize { + return nil, fmt.Errorf("calculateIV: invalid iv size: %v", len(initIV)) + } + + counter := offset / aes.BlockSize + iv := make([]byte, aes.BlockSize) + + high := binary.BigEndian.Uint64(initIV[:8]) + low := binary.BigEndian.Uint64(initIV[8:]) + origLow := low + + low += uint64(counter) + if low < origLow { // wrap + high += 1 + } + + binary.BigEndian.PutUint64(iv, high) + binary.BigEndian.PutUint64(iv[8:], low) + + return iv, nil +} + +// aesCreateCTRStream create stream to encrypt/decrypt data from specific offset +func aesCreateCTRStream(offset int64, enc *transparentEncryptionInfo) (cipher.Stream, error) { + iv, err := calculateIV(offset, enc.iv) + if err != nil { + return nil, err + } + + if enc.cipher == nil { + cipher, err := aes.NewCipher(enc.key) + if err != nil { + return nil, err + } + enc.cipher = cipher + } + + stream := cipher.NewCTR(enc.cipher, iv) + + padding := offset % aes.BlockSize + if padding > 0 { + tmp := make([]byte, padding) + stream.XORKeyStream(tmp, tmp) + } + return stream, nil +} diff --git a/aes_test.go b/aes_test.go new file mode 100644 index 00000000..26fe7ce7 --- /dev/null +++ b/aes_test.go @@ -0,0 +1,69 @@ +package hdfs + +import ( + "bytes" + "crypto/cipher" + "testing" + + "github.com/stretchr/testify/assert" +) + +// aesCtrRead perform AES-CTR XOR operation on given byte string. +// Once encryption and decryption are exactly the same operation for CTR mode, +// this function can be used to perform both. +func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) { + stream, err := aesCreateCTRStream(offset, enc) + if err != nil { + return nil, err + } + + r := make([]byte, len(b)) + _, err = cipher.StreamReader{S: stream, R: bytes.NewReader(b)}.Read(r) + if err != nil { + return nil, err + } + return r, nil +} + +func TestAesIV(t *testing.T) { + originalText := []byte("some random plain text, nice to have it quite long") + key := []byte("0123456789abcdef") + + // Choose iv to hit counter overflow. + iv := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xf5") + enc := &transparentEncryptionInfo{iv: iv, key: key} + + // Ensure that we can decrypt text after encryption. + // In CTR mode, implementation for `encrypt` and `decrypt` actually the same + // since we just XOR on input. + encryptedText, err := aesCtrStep(0, enc, originalText) + assert.Equal(t, err, nil) + decryptedText, err := aesCtrStep(0, enc, encryptedText) + assert.Equal(t, err, nil) + assert.Equal(t, originalText, decryptedText) + + // CTR mode allow us to encrypt/decrypt string by chunks + // (using correct offset from start of string). + // Ensure that result equal to one, produced in one step. + encryptedByChunks := make([]byte, 0) + var pos int64 = 0 + for _, x := range []int{5, 7, 6, 4, 28} { + tmp, err := aesCtrStep(pos, enc, originalText[pos:pos+int64(x)]) + assert.Equal(t, err, nil) + encryptedByChunks = append(encryptedByChunks, tmp...) + pos += int64(x) + } + assert.Equal(t, encryptedByChunks, encryptedText) + + // Decrypt string by chunks. + // Ensure that result equal to one, produced in one step. + decryptedByChunks := make([]byte, 0) + pos = 0 + for _, x := range []int{5, 7, 6, 4, 28} { + tmp, err := aesCtrStep(pos, enc, encryptedText[pos:pos+int64(x)]) + assert.Equal(t, err, nil) + decryptedByChunks = append(decryptedByChunks, tmp...) + pos += int64(x) + } + assert.Equal(t, decryptedByChunks, decryptedText) +} diff --git a/client.go b/client.go index 4e8ed316..5d0c2f96 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,8 @@ import ( "io" "io/ioutil" "net" + "net/http" + "net/http/cookiejar" "os" "os/user" "sort" @@ -36,6 +38,8 @@ type Client struct { defaults *hdfs.FsServerDefaultsProto encryptionKey *hdfs.DataEncryptionKeyProto + + http *http.Client } // ClientOptions represents the configurable options for a client. @@ -203,7 +207,16 @@ func NewClient(options ClientOptions) (*Client, error) { return nil, err } - return &Client{namenode: namenode, options: options}, nil + // We need cookies to access KMS (required for HDFS encrypted zone). + jar, err := cookiejar.New(nil) + if err != nil { + return nil, errors.New("cant create cookie jar") + } + + // Not extending ClientOptions to preserve compatibility, so timeouts not configured. + http := &http.Client{Jar: jar} + + return &Client{namenode: namenode, options: options, http: http}, nil } // New returns Client connected to the namenode(s) specified by address, or an diff --git a/cmd/hdfs/test/helper.bash b/cmd/hdfs/test/helper.bash index 37a45241..d8d0552e 100644 --- a/cmd/hdfs/test/helper.bash +++ b/cmd/hdfs/test/helper.bash @@ -1,6 +1,7 @@ #!/bin/bash export HADOOP_FS=${HADOOP_FS-"hadoop fs"} +export HADOOP_KEY=${HADOOP_KEY-"hadoop key"} export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.." export HDFS="$ROOT_TEST_DIR/hdfs" diff --git a/cmd/hdfs/test/te.bats b/cmd/hdfs/test/te.bats new file mode 100644 index 00000000..74dffa46 --- /dev/null +++ b/cmd/hdfs/test/te.bats @@ -0,0 +1,66 @@ +#!/usr/bin/env bats + +load helper + +@test "te: upload via native client, ensure we can download" { + if [ "$TRANSPARENT_ENCRYPTION" = "true" ]; then + run $HADOOP_FS -put $ROOT_TEST_DIR/testdata/foo.txt /_test/kms/foo1 + assert_success + run $HDFS cat /_test/kms/foo1 + assert_output "bar" + else + skip + fi +} + +@test "te: ensure native client can download once we uploaded to encrypted zone" { + if [ "$TRANSPARENT_ENCRYPTION" = "true" ]; then + run $HDFS put $ROOT_TEST_DIR/testdata/foo.txt /_test/kms/foo2 + assert_success + run $HADOOP_FS -cat /_test/kms/foo2 + assert_output "bar" + else + skip + fi +} + +@test "te: tail" { + if [ "$TRANSPARENT_ENCRYPTION" = "true" ]; then + run $HDFS put $ROOT_TEST_DIR/testdata/mobydick.txt /_test/kms/ + assert_success + run bash -c "$HDFS tail /_test/kms/mobydick.txt > $BATS_TMPDIR/mobydick_test.txt" + assert_success + SHA=`tail $ROOT_TEST_DIR/testdata/mobydick.txt | shasum | awk '{ print $1 }'` + assert_equal $SHA `shasum < $BATS_TMPDIR/mobydick_test.txt | awk '{ print $1 }'` + else + skip + fi +} + +@test "te: key not available" { + if [ "$TRANSPARENT_ENCRYPTION" = "true" ]; then + run $HADOOP_FS -mkdir -p /_test/kms-no-key + assert_success + run $HADOOP_KEY create key-removed + assert_success + run hdfs crypto -createZone -keyName key-removed -path /_test/kms-no-key + assert_success + run $HADOOP_FS -put $ROOT_TEST_DIR/testdata/foo.txt /_test/kms-no-key/foo + assert_success + run $HADOOP_KEY delete key-removed -f + assert_success + run $HDFS cat /_test/kms-no-key/foo + assert_failure + assert_output "open /_test/kms-no-key/foo: kms: 'key-removed@0' not found" + + run $HDFS put $ROOT_TEST_DIR/testdata/foo.txt /_test/kms-no-key/foo2 + assert_failure + assert_output "create /_test/kms-no-key/foo2: kms: 'key-removed@0' not found" + + run $HDFS ls /_test/kms-no-key/foo2 + assert_failure + assert_output "stat /_test/kms-no-key/foo2: file does not exist" + else + skip + fi +} diff --git a/file_reader.go b/file_reader.go index e1e18798..bfe699d1 100644 --- a/file_reader.go +++ b/file_reader.go @@ -1,6 +1,7 @@ package hdfs import ( + "crypto/cipher" "crypto/md5" "errors" "fmt" @@ -29,6 +30,17 @@ type FileReader struct { readdirLast string closed bool + + // encryption + enc *transparentEncryptionInfo +} + +// A transparentEncryptionInfo is a key and iv to encrypt or decrypt file data +type transparentEncryptionInfo struct { + key []byte + iv []byte + cipher cipher.Block + stream cipher.Stream } // Open returns an FileReader which can be used for reading. @@ -38,11 +50,25 @@ func (c *Client) Open(name string) (*FileReader, error) { return nil, &os.PathError{"open", name, interpretException(err)} } + status, ok := info.Sys().(*FileStatus) + if !ok { + return nil, &os.PathError{"open", name, errors.New("internal error: fail to access file status")} + } + + var enc *transparentEncryptionInfo + if status.FileEncryptionInfo != nil { + enc, err = c.kmsGetKey(status.FileEncryptionInfo) + if err != nil { + return nil, &os.PathError{"open", name, err} + } + } + return &FileReader{ client: c, name: name, info: info, closed: false, + enc: enc, }, nil } @@ -159,6 +185,12 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) { if f.offset != off { f.offset = off + // To make things simpler, we just destroy cipher.Stream (if any) + // It will be recreated in Read() + if f.enc != nil { + f.enc.stream = nil + } + if f.blockReader != nil { // If the seek is within the next few chunks, it's much more // efficient to throw away a few bytes than to reconnect and start @@ -215,7 +247,21 @@ func (f *FileReader) Read(b []byte) (int, error) { } } - n, err := f.blockReader.Read(b) + var n int + var err error + + if f.enc != nil { + if f.enc.stream == nil { + f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc) + if err != nil { + return 0, err + } + } + n, err = cipher.StreamReader{S: f.enc.stream, R: f.blockReader}.Read(b) + } else { + n, err = f.blockReader.Read(b) + } + f.offset += int64(n) if err != nil && err != io.EOF { diff --git a/file_writer.go b/file_writer.go index e5877f1c..c83b5788 100644 --- a/file_writer.go +++ b/file_writer.go @@ -1,6 +1,7 @@ package hdfs import ( + "crypto/cipher" "errors" "os" "time" @@ -28,9 +29,13 @@ type FileWriter struct { replication int blockSize int64 fileId *uint64 + offset int64 blockWriter *transfer.BlockWriter deadline time.Time + + // Key and IV for transparent encryption support. + enc *transparentEncryptionInfo } // Create opens a new file in HDFS with the default replication, block size, @@ -63,13 +68,14 @@ func (c *Client) Create(name string) (*FileWriter, error) { // very important that Close is called after all data has been written. func (c *Client) CreateFile(name string, replication int, blockSize int64, perm os.FileMode) (*FileWriter, error) { createReq := &hdfs.CreateRequestProto{ - Src: proto.String(name), - Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))}, - ClientName: proto.String(c.namenode.ClientName), - CreateFlag: proto.Uint32(1), - CreateParent: proto.Bool(false), - Replication: proto.Uint32(uint32(replication)), - BlockSize: proto.Uint64(uint64(blockSize)), + Src: proto.String(name), + Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))}, + ClientName: proto.String(c.namenode.ClientName), + CreateFlag: proto.Uint32(1), + CreateParent: proto.Bool(false), + Replication: proto.Uint32(uint32(replication)), + BlockSize: proto.Uint64(uint64(blockSize)), + CryptoProtocolVersion: []hdfs.CryptoProtocolVersionProto{hdfs.CryptoProtocolVersionProto_ENCRYPTION_ZONES}, } createResp := &hdfs.CreateResponseProto{} @@ -78,12 +84,22 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm return nil, &os.PathError{"create", name, interpretCreateException(err)} } + var enc *transparentEncryptionInfo + if createResp.GetFs().GetFileEncryptionInfo() != nil { + enc, err = c.kmsGetKey(createResp.GetFs().GetFileEncryptionInfo()) + if err != nil { + c.Remove(name) + return nil, &os.PathError{"create", name, err} + } + } + return &FileWriter{ client: c, name: name, replication: replication, blockSize: blockSize, fileId: createResp.Fs.FileId, + enc: enc, }, nil } @@ -108,12 +124,22 @@ func (c *Client) Append(name string) (*FileWriter, error) { return nil, &os.PathError{"append", name, interpretException(err)} } + var enc *transparentEncryptionInfo + if appendResp.GetStat().GetFileEncryptionInfo() != nil { + enc, err = c.kmsGetKey(appendResp.GetStat().GetFileEncryptionInfo()) + if err != nil { + return nil, &os.PathError{"append", name, err} + } + } + f := &FileWriter{ client: c, name: name, replication: int(appendResp.Stat.GetBlockReplication()), blockSize: int64(appendResp.Stat.GetBlocksize()), fileId: appendResp.Stat.FileId, + offset: int64(*appendResp.Stat.Length), + enc: enc, } // This returns nil if there are no blocks (it's an empty file) or if the @@ -188,8 +214,27 @@ func (f *FileWriter) Write(b []byte) (int, error) { off := 0 for off < len(b) { - n, err := f.blockWriter.Write(b[off:]) + var n int + var err error + + if f.enc != nil { + if f.enc.stream == nil { + f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc) + if err != nil { + return 0, err + } + } + n, err = cipher.StreamWriter{S: f.enc.stream, W: f.blockWriter}.Write(b[off:]) + // If blockWriter writes less than expected bytes, + // we must recreate stream chipher, since it's internal counter goes forward. + if n != len(b[off:]) { + f.enc.stream = nil + } + } else { + n, err = f.blockWriter.Write(b[off:]) + } off += n + f.offset += int64(n) if err == transfer.ErrEndOfBlock { err = f.startNewBlock() } diff --git a/file_writer_test.go b/file_writer_test.go index 38eb8a6a..fbc2d02e 100644 --- a/file_writer_test.go +++ b/file_writer_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "math/rand" "os" + "os/exec" "path/filepath" "strings" "testing" @@ -531,3 +532,124 @@ func TestFileAppendDeadlineBefore(t *testing.T) { _, err = writer.Write([]byte("foo\n")) assert.Error(t, err) } + +func skipWithoutEncryptedZone(t *testing.T) { + if os.Getenv("TRANSPARENT_ENCRYPTION") != "true" { + t.Skip("Skipping, this test requires encryption zone to make sense") + } +} + +func TestEncryptedZoneWriteChunks(t *testing.T) { + skipWithoutEncryptedZone(t) + + originalText := []byte("some random plain text, nice to have it quite long") + client := getClient(t) + writer, err := client.Create("/_test/kms/write_chunks.txt") + require.NoError(t, err) + + var pos int64 = 0 + for _, x := range []int{5, 7, 6, 4, 28} { + _, err = writer.Write(originalText[pos : pos+int64(x)]) + require.NoError(t, err) + pos += int64(x) + } + assertClose(t, writer) + + reader, err := client.Open("/_test/kms/write_chunks.txt") + require.NoError(t, err) + + bytes, err := ioutil.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, originalText, bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/write_chunks.txt").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) +} + +func TestEncryptedZoneAppendChunks(t *testing.T) { + skipWithoutEncryptedZone(t) + + originalText := []byte("some random plain text, nice to have it quite long") + client := getClient(t) + writer, err := client.Create("/_test/kms/append_chunks.txt") + require.NoError(t, err) + assertClose(t, writer) + + var pos int64 = 0 + for _, x := range []int{5, 7, 6, 4, 28} { + writer, err := client.Append("/_test/kms/append_chunks.txt") + require.NoError(t, err) + _, err = writer.Write(originalText[pos : pos+int64(x)]) + require.NoError(t, err) + pos += int64(x) + assertClose(t, writer) + } + + reader, err := client.Open("/_test/kms/append_chunks.txt") + require.NoError(t, err) + bytes, err := ioutil.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, originalText, bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/append_chunks.txt").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) +} + +func TestEncryptedZoneLargeBlock(t *testing.T) { + skipWithoutEncryptedZone(t) + + // Generate quite large data block, so we can trigger encryption in chunks. + mobydick, err := os.Open("testdata/mobydick.txt") + require.NoError(t, err) + originalText, err := ioutil.ReadAll(mobydick) + require.NoError(t, err) + client := getClient(t) + + // Create file with small (128Kb) block size, so encrypted chunk will be placed over multiple hdfs blocks. + writer, err := client.CreateFile("/_test/kms/mobydick.unittest", 1, 131072, 0755) + require.NoError(t, err) + + _, err = writer.Write(originalText) + require.NoError(t, err) + assertClose(t, writer) + + reader, err := client.Open("/_test/kms/mobydick.unittest") + require.NoError(t, err) + bytes, err := ioutil.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, originalText, bytes) + + // Ensure read after seek works as expected: + _, err = reader.Seek(35657, io.SeekStart) + require.NoError(t, err) + bytes = make([]byte, 64) + _, err = reader.Read(bytes) + require.NoError(t, err) + assert.Equal(t, []byte("By reason of these things, then, the whaling voyage was welcome;"), bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/mobydick.unittest").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) +} + +func TestEncryptedZoneReadAfterJava(t *testing.T) { + skipWithoutEncryptedZone(t) + + err := exec.Command("hadoop", "dfs", "-copyFromLocal", "testdata/mobydick.txt", "/_test/kms/mobydick.java").Run() + require.NoError(t, err) + + mobydick, err := os.Open("testdata/mobydick.txt") + require.NoError(t, err) + originalText, err := ioutil.ReadAll(mobydick) + require.NoError(t, err) + + client := getClient(t) + reader, err := client.Open("/_test/kms/mobydick.java") + require.NoError(t, err) + bytes, err := ioutil.ReadAll(reader) + require.NoError(t, err) + + assert.Equal(t, originalText, bytes) +} diff --git a/go.mod b/go.mod index 461445d2..7e904baa 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/jcmturner/gofork v1.0.0 // indirect github.com/jcmturner/goidentity/v6 v6.0.1 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9 // indirect golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect diff --git a/go.sum b/go.sum index 7c2a2f0c..18bb2c99 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/pborman/getopt v1.1.0 h1:eJ3aFZroQqq0bWmraivjQNt6Dmm5M0h2JcDW38/Azb0= github.com/pborman/getopt v1.1.0/go.mod h1:FxXoW1Re00sQG/+KIkuSqRL/LwQgSkv7uyac+STFsbk= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/kms.go b/kms.go new file mode 100644 index 00000000..fcc07193 --- /dev/null +++ b/kms.go @@ -0,0 +1,236 @@ +package hdfs + +import ( + "bytes" + "crypto/aes" + "io/ioutil" + "math/rand" + + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/pkg/errors" + + hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" + spnego "github.com/jcmturner/gokrb5/v8/spnego" +) + +const ( + kmsSchemeHTTP = "kms://http@" + kmsSchemeHTTPS = "kms://https@" +) + +func (c *Client) kmsAuth(url string) error { + if c.options.KerberosClient == nil { + url += ("&user.name=" + c.options.User) + } + + req, err := http.NewRequest("OPTIONS", url, nil) + if err != nil { + return err + } + + var resp *http.Response + if c.options.KerberosClient != nil { + kHttp := spnego.NewClient(c.options.KerberosClient, c.http, "") + resp, err = kHttp.Do(req) + } else { + resp, err = c.http.Do(req) + } + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("bad kms auth status: %v", resp.StatusCode) + } + return nil +} + +// parse uri like kms://https@kms01.example.com;kms02.example.com:9600/kms +func kmsParseProviderUri(uri string) ([]string, error) { + original_uri := uri + + if uri == "" { + return nil, errors.New("KeyProviderUri empty. not configured on server ?") + } + + var urls []string + var proto string + if strings.HasPrefix(uri, kmsSchemeHTTPS) { + proto = "https://" + uri = uri[len(kmsSchemeHTTPS):] + } + if proto == "" && strings.HasPrefix(uri, kmsSchemeHTTP) { + proto = "http://" + uri = uri[len(kmsSchemeHTTP):] + } + if proto == "" { + return nil, fmt.Errorf("not supported uri %v", original_uri) + } + + port := ":9600" // default kms port + path := "" // default path + + parts := strings.Split(uri, ";") + for i, s := range parts { + path_index := strings.Index(s, "/") + if path_index > -1 { + path = s[path_index:] + s = s[:path_index] + } + port_index := strings.Index(s, ":") + if port_index > -1 { + port = s[port_index:] + s = s[:port_index] + } + if (path_index > -1 || port_index > -1) && i+1 != len(parts) { + return nil, fmt.Errorf("bad uri: %v", original_uri) + } + urls = append(urls, proto+s) + } + + for i := range urls { + urls[i] += port + urls[i] += path + } + + return urls, nil +} + +// kmsUrl parse KeyProviderUri to list of URL's +func (c *Client) kmsUrl(einfo *hdfs.FileEncryptionInfoProto) ([]string, error) { + defaults, err := c.fetchDefaults() + if err != nil { + return nil, err + } + + urls, err := kmsParseProviderUri(defaults.GetKeyProviderUri()) + if err != nil { + return nil, err + } + + // Reorder urls. Simple method to round robin calls across em. + rand.Shuffle(len(urls), func(i, j int) { urls[i], urls[j] = urls[j], urls[i] }) + + for i := range urls { + urls[i] = urls[i] + "/v1/keyversion/" + url.QueryEscape(*einfo.EzKeyVersionName) + "/_eek?eek_op=decrypt" + } + + return urls, nil +} + +func (c *Client) kmsRequest(url string, requestBody []byte) ([]byte, int, error) { + resp, err := c.http.Post(url, "application/json", bytes.NewBuffer(requestBody)) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + var responseBody []byte + responseBody, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, 0, err + } + + return responseBody, resp.StatusCode, nil +} + +func (c *Client) kmsDecrypt(url string, requestBody []byte) ([]byte, error) { + responseBody, statusCode, err := c.kmsRequest(url, requestBody) + if err != nil { + return nil, err + } + + if statusCode == 401 { + err = c.kmsAuth(url) + if err != nil { + return nil, err + } + // retry with cookie + responseBody, statusCode, err = c.kmsRequest(url, requestBody) + if err != nil { + return nil, err + } + } + + if statusCode != 200 { + // On error, kms respond with error message in JSON object. + type Exception struct { + RemoteException struct { + Message string `json:"message"` + } + } + var kmsException Exception + if err = json.Unmarshal(responseBody, &kmsException); err == nil && kmsException.RemoteException.Message != "" { + errorMessage := kmsException.RemoteException.Message + err = errors.New(errorMessage) + } else { + err = fmt.Errorf("unexpected response code from KMS: %v", statusCode) + } + return nil, err + } + + type KmsRespose struct { + Key string `json:"material"` + } + var kmsResponseJson KmsRespose + if err = json.Unmarshal(responseBody, &kmsResponseJson); err != nil { + return nil, err + } + + var key []byte + key, err = base64.RawURLEncoding.DecodeString(kmsResponseJson.Key) + if err != nil { + return nil, err + } + if len(key) != aes.BlockSize { + return nil, fmt.Errorf("unexpected key size from KMS: %v", len(key)) + } + + return key, nil +} + +func (c *Client) kmsGetKey(einfo *hdfs.FileEncryptionInfoProto) (*transparentEncryptionInfo, error) { + if einfo.GetCryptoProtocolVersion() != hdfs.CryptoProtocolVersionProto_ENCRYPTION_ZONES { + return nil, fmt.Errorf("not supported CryptoProtocolVersion %v", einfo.CryptoProtocolVersion) + } + if einfo.GetSuite() != hdfs.CipherSuiteProto_AES_CTR_NOPADDING { + return nil, fmt.Errorf("not supported CipherSuiteProto %v", einfo.Suite) + } + + urls, err := c.kmsUrl(einfo) + if err != nil { + return nil, errors.Wrap(err, "fail to get KMS address") + } + + requestBody, err := json.Marshal(map[string]string{ + "material": base64.URLEncoding.EncodeToString(einfo.Key), + "iv": base64.URLEncoding.EncodeToString(einfo.Iv), + "name": *einfo.KeyName}) + if err != nil { + return nil, err + } + + var key []byte + for _, url := range urls { + key, err = c.kmsDecrypt(url, requestBody) + if err == nil { + break + } + } + + if err != nil { + return nil, errors.Wrap(err, "kms") + } + + return &transparentEncryptionInfo{ + key: key, + iv: einfo.Iv, + }, nil +} diff --git a/kms_test.go b/kms_test.go new file mode 100644 index 00000000..c571cb24 --- /dev/null +++ b/kms_test.go @@ -0,0 +1,53 @@ +package hdfs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKmsParseProviderUri(t *testing.T) { + assert.Equal(t, nil, nil) + + urls, err := kmsParseProviderUri("") + assert.Error(t, err) + + urls, err = kmsParseProviderUri("http") + assert.Error(t, err) + + urls, err = kmsParseProviderUri("kms://https@localhost:9600/kms") + assert.NoError(t, err) + assert.Equal(t, 1, len(urls)) + assert.Equal(t, "https://localhost:9600/kms", urls[0]) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com:9600;kms02.example.com") + assert.Error(t, err) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com/kms;kms02.example.com") + assert.Error(t, err) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com;kms02.example.com:9600/kms") + assert.NoError(t, err) + assert.Equal(t, 2, len(urls)) + assert.Equal(t, "http://kms01.example.com:9600/kms", urls[0]) + assert.Equal(t, "http://kms02.example.com:9600/kms", urls[1]) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com;kms02.example.com/kms") + assert.NoError(t, err) + assert.Equal(t, 2, len(urls)) + assert.Equal(t, "http://kms01.example.com:9600/kms", urls[0]) + assert.Equal(t, "http://kms02.example.com:9600/kms", urls[1]) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com;kms02.example.com:9600") + assert.NoError(t, err) + assert.Equal(t, 2, len(urls)) + assert.Equal(t, "http://kms01.example.com:9600", urls[0]) + assert.Equal(t, "http://kms02.example.com:9600", urls[1]) + + urls, err = kmsParseProviderUri("kms://http@kms01.example.com;kms02.example.com;kms03.example.com") + assert.NoError(t, err) + assert.Equal(t, 3, len(urls)) + assert.Equal(t, "http://kms01.example.com:9600", urls[0]) + assert.Equal(t, "http://kms02.example.com:9600", urls[1]) + assert.Equal(t, "http://kms03.example.com:9600", urls[2]) +}