Skip to content

Commit d34f148

Browse files
committed
Implement PSS-related methods in grpcwrap package
1 parent fc108f9 commit d34f148

File tree

1 file changed

+245
-8
lines changed

1 file changed

+245
-8
lines changed

internal/grpcwrap/provider6.go

Lines changed: 245 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package grpcwrap
55

66
import (
7+
"bytes"
78
"context"
9+
"errors"
810
"fmt"
11+
"io"
912

1013
"github.com/zclconf/go-cty/cty"
1114
"github.com/zclconf/go-cty/cty/function"
@@ -15,6 +18,9 @@ import (
1518
"google.golang.org/grpc/status"
1619
"google.golang.org/protobuf/types/known/timestamppb"
1720

21+
proto6 "github.com/hashicorp/terraform/internal/tfplugin6"
22+
23+
backendPluggable "github.com/hashicorp/terraform/internal/backend/pluggable"
1824
"github.com/hashicorp/terraform/internal/plugin6/convert"
1925
"github.com/hashicorp/terraform/internal/providers"
2026
"github.com/hashicorp/terraform/internal/tfplugin6"
@@ -31,11 +37,15 @@ func Provider6(p providers.Interface) tfplugin6.ProviderServer {
3137
}
3238
}
3339

40+
var _ tfplugin6.ProviderServer = &provider6{}
41+
3442
type provider6 struct {
3543
provider providers.Interface
3644
schema providers.GetProviderSchemaResponse
3745
identitySchemas providers.GetResourceIdentitySchemasResponse
3846

47+
chunkSize int64
48+
3949
tfplugin6.UnimplementedProviderServer
4050
}
4151

@@ -902,35 +912,262 @@ func (p *provider6) ListResource(req *tfplugin6.ListResource_Request, res tfplug
902912
}
903913

904914
func (p *provider6) ValidateStateStoreConfig(ctx context.Context, req *tfplugin6.ValidateStateStore_Request) (*tfplugin6.ValidateStateStore_Response, error) {
905-
panic("not implemented")
915+
resp := &tfplugin6.ValidateStateStore_Response{}
916+
917+
s, ok := p.schema.StateStores[req.TypeName]
918+
if !ok {
919+
diag := &tfplugin6.Diagnostic{
920+
Severity: tfplugin6.Diagnostic_ERROR,
921+
Summary: "Unsupported state store type",
922+
Detail: fmt.Sprintf("This provider doesn't include a state store called %q", req.TypeName),
923+
}
924+
resp.Diagnostics = append(resp.Diagnostics, diag)
925+
return resp, nil
926+
}
927+
ty := s.Body.ImpliedType()
928+
929+
configVal, err := decodeDynamicValue6(req.Config, ty)
930+
if err != nil {
931+
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, err)
932+
return resp, nil
933+
}
934+
935+
prepareResp := p.provider.ValidateStateStoreConfig(providers.ValidateStateStoreConfigRequest{
936+
TypeName: req.TypeName,
937+
Config: configVal,
938+
})
939+
940+
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, prepareResp.Diagnostics)
941+
return resp, nil
906942
}
907943

908944
func (p *provider6) ConfigureStateStore(ctx context.Context, req *tfplugin6.ConfigureStateStore_Request) (*tfplugin6.ConfigureStateStore_Response, error) {
909-
panic("not implemented")
945+
resp := &tfplugin6.ConfigureStateStore_Response{}
946+
947+
s, ok := p.schema.StateStores[req.TypeName]
948+
if !ok {
949+
diag := &tfplugin6.Diagnostic{
950+
Severity: tfplugin6.Diagnostic_ERROR,
951+
Summary: "Unsupported state store type",
952+
Detail: fmt.Sprintf("This provider doesn't include a state store called %q", req.TypeName),
953+
}
954+
resp.Diagnostics = append(resp.Diagnostics, diag)
955+
return resp, nil
956+
}
957+
ty := s.Body.ImpliedType()
958+
959+
configVal, err := decodeDynamicValue6(req.Config, ty)
960+
if err != nil {
961+
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, err)
962+
return resp, nil
963+
}
964+
965+
configureResp := p.provider.ConfigureStateStore(providers.ConfigureStateStoreRequest{
966+
TypeName: req.TypeName,
967+
Config: configVal,
968+
Capabilities: providers.StateStoreClientCapabilities{
969+
ChunkSize: backendPluggable.DefaultStateStoreChunkSize,
970+
},
971+
})
972+
973+
// Validate the returned chunk size value
974+
if configureResp.Capabilities.ChunkSize == 0 || configureResp.Capabilities.ChunkSize > backendPluggable.MaxStateStoreChunkSize {
975+
diag := &tfplugin6.Diagnostic{
976+
Severity: tfplugin6.Diagnostic_ERROR,
977+
Summary: "Failed to negotiate acceptable chunk size",
978+
Detail: fmt.Sprintf("Expected size > 0 and <= %d bytes, provider wants %d bytes",
979+
backendPluggable.MaxStateStoreChunkSize, configureResp.Capabilities.ChunkSize),
980+
}
981+
resp.Diagnostics = append(resp.Diagnostics, diag)
982+
return resp, nil
983+
}
984+
p.chunkSize = configureResp.Capabilities.ChunkSize
985+
986+
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, configureResp.Diagnostics)
987+
resp.Capabilities = &tfplugin6.StateStoreServerCapabilities{
988+
ChunkSize: configureResp.Capabilities.ChunkSize,
989+
}
990+
return resp, nil
910991
}
911992

912993
func (p *provider6) ReadStateBytes(req *tfplugin6.ReadStateBytes_Request, srv tfplugin6.Provider_ReadStateBytesServer) error {
913-
panic("not implemented")
994+
stateReadResp := p.provider.ReadStateBytes(providers.ReadStateBytesRequest{
995+
TypeName: req.TypeName,
996+
StateId: req.StateId,
997+
})
998+
999+
state := stateReadResp.Bytes
1000+
reader := bytes.NewReader(state)
1001+
totalLength := reader.Size() // length in bytes
1002+
rangeStart := 0
1003+
1004+
for {
1005+
var diags []*proto6.Diagnostic
1006+
readBytes := make([]byte, p.chunkSize)
1007+
byteCount, err := reader.Read(readBytes)
1008+
if err != nil && !errors.Is(err, io.EOF) {
1009+
diags := []*proto6.Diagnostic{
1010+
{
1011+
Severity: proto6.Diagnostic_ERROR,
1012+
Summary: "Error reading from state file",
1013+
Detail: fmt.Sprintf("PSS experienced an error when reading from the state file for workspace %s: %s",
1014+
req.StateId,
1015+
err,
1016+
),
1017+
},
1018+
}
1019+
err := srv.Send(&proto6.ReadStateBytes_Response{
1020+
// Zero values accompany the error diagnostic
1021+
Bytes: nil,
1022+
TotalLength: 0,
1023+
Range: &proto6.StateRange{
1024+
Start: 0,
1025+
End: 0,
1026+
},
1027+
Diagnostics: diags,
1028+
})
1029+
if err != nil {
1030+
return err
1031+
}
1032+
}
1033+
1034+
if byteCount == 0 {
1035+
// The previous iteration read the last byte of the data.
1036+
return nil
1037+
}
1038+
1039+
err = srv.Send(&proto6.ReadStateBytes_Response{
1040+
Bytes: readBytes[0:byteCount],
1041+
TotalLength: int64(totalLength),
1042+
Range: &proto6.StateRange{
1043+
Start: int64(rangeStart),
1044+
End: int64(rangeStart + byteCount),
1045+
},
1046+
Diagnostics: diags,
1047+
})
1048+
if err != nil {
1049+
return err
1050+
}
1051+
1052+
// Track progress to ensure Range values are correct.
1053+
rangeStart += byteCount
1054+
}
9141055
}
9151056

9161057
func (p *provider6) WriteStateBytes(srv tfplugin6.Provider_WriteStateBytesServer) error {
917-
panic("not implemented")
1058+
var typeName string
1059+
var stateId string
1060+
1061+
state := bytes.Buffer{}
1062+
var grpcErr error
1063+
var totalReceivedBytes int
1064+
var expectedTotalLength int64
1065+
for {
1066+
chunk, err := srv.Recv()
1067+
if err == io.EOF {
1068+
break
1069+
}
1070+
if err != nil {
1071+
grpcErr = fmt.Errorf("wrapped err: %w", err)
1072+
break
1073+
}
1074+
if expectedTotalLength == 0 {
1075+
// On the first iteration
1076+
expectedTotalLength = chunk.TotalLength // record expected length
1077+
if chunk.Meta != nil {
1078+
// We expect the Meta to be set on the first message, only
1079+
typeName = chunk.Meta.TypeName
1080+
stateId = chunk.Meta.StateId
1081+
} else {
1082+
panic("expected Meta to be set on first chunk sent to WriteStateBytes")
1083+
}
1084+
}
1085+
1086+
n, err := state.Write(chunk.Bytes)
1087+
if err != nil {
1088+
return fmt.Errorf("error writing state: %w", err)
1089+
}
1090+
totalReceivedBytes += n
1091+
}
1092+
1093+
if grpcErr != nil {
1094+
return grpcErr
1095+
}
1096+
1097+
if int64(totalReceivedBytes) != expectedTotalLength {
1098+
return fmt.Errorf("expected to receive state in %d bytes, actually received %d bytes", expectedTotalLength, totalReceivedBytes)
1099+
}
1100+
1101+
if totalReceivedBytes == 0 {
1102+
// Even an empty state file has content; no bytes is not valid
1103+
return errors.New("No state data received from Terraform: No state data was received from Terraform. This is a bug and should be reported.")
1104+
}
1105+
1106+
resp := p.provider.WriteStateBytes(providers.WriteStateBytesRequest{
1107+
StateId: stateId,
1108+
TypeName: typeName,
1109+
Bytes: state.Bytes(),
1110+
})
1111+
1112+
err := srv.SendAndClose(&proto6.WriteStateBytes_Response{
1113+
Diagnostics: convert.AppendProtoDiag([]*proto6.Diagnostic{}, resp.Diagnostics),
1114+
})
1115+
1116+
return err
9181117
}
9191118

9201119
func (p *provider6) LockState(ctx context.Context, req *tfplugin6.LockState_Request) (*tfplugin6.LockState_Response, error) {
921-
panic("not implemented")
1120+
lockResp := p.provider.LockState(providers.LockStateRequest{
1121+
TypeName: req.TypeName,
1122+
StateId: req.StateId,
1123+
Operation: req.Operation,
1124+
})
1125+
1126+
resp := &tfplugin6.LockState_Response{
1127+
LockId: lockResp.LockId,
1128+
Diagnostics: convert.AppendProtoDiag([]*proto6.Diagnostic{}, lockResp.Diagnostics),
1129+
}
1130+
1131+
return resp, nil
9221132
}
9231133

9241134
func (p *provider6) UnlockState(ctx context.Context, req *tfplugin6.UnlockState_Request) (*tfplugin6.UnlockState_Response, error) {
925-
panic("not implemented")
1135+
unlockResp := p.provider.UnlockState(providers.UnlockStateRequest{
1136+
TypeName: req.TypeName,
1137+
StateId: req.StateId,
1138+
LockId: req.LockId,
1139+
})
1140+
1141+
resp := &tfplugin6.UnlockState_Response{
1142+
Diagnostics: convert.AppendProtoDiag([]*proto6.Diagnostic{}, unlockResp.Diagnostics),
1143+
}
1144+
1145+
return resp, nil
9261146
}
9271147

9281148
func (p *provider6) GetStates(ctx context.Context, req *tfplugin6.GetStates_Request) (*tfplugin6.GetStates_Response, error) {
929-
panic("not implemented")
1149+
getStatesResp := p.provider.GetStates(providers.GetStatesRequest{
1150+
TypeName: req.TypeName,
1151+
})
1152+
1153+
resp := &tfplugin6.GetStates_Response{
1154+
StateId: getStatesResp.States,
1155+
Diagnostics: convert.AppendProtoDiag([]*tfplugin6.Diagnostic{}, getStatesResp.Diagnostics),
1156+
}
1157+
1158+
return resp, nil
9301159
}
9311160

9321161
func (p *provider6) DeleteState(ctx context.Context, req *tfplugin6.DeleteState_Request) (*tfplugin6.DeleteState_Response, error) {
933-
panic("not implemented")
1162+
deleteStatesResp := p.provider.DeleteState(providers.DeleteStateRequest{
1163+
TypeName: req.TypeName,
1164+
})
1165+
1166+
resp := &tfplugin6.DeleteState_Response{
1167+
Diagnostics: convert.AppendProtoDiag([]*tfplugin6.Diagnostic{}, deleteStatesResp.Diagnostics),
1168+
}
1169+
1170+
return resp, nil
9341171
}
9351172

9361173
func (p *provider6) PlanAction(_ context.Context, req *tfplugin6.PlanAction_Request) (*tfplugin6.PlanAction_Response, error) {

0 commit comments

Comments
 (0)