44 "context"
55 "encoding/json"
66 "fmt"
7+ "time"
78
89 "github.com/goto/entropy/core/module"
910 "github.com/goto/entropy/core/resource"
@@ -32,6 +33,7 @@ type Job struct {
3233
3334type JobsExceptionResponse struct {
3435 RootException string `json:"root-exception"`
36+ Timestamp int64 `json:"timestamp"`
3537}
3638
3739func (dd * daggerDriver ) Output (ctx context.Context , exr module.ExpandedResource ) (json.RawMessage , error ) {
@@ -86,11 +88,15 @@ func (dd *daggerDriver) refreshOutput(ctx context.Context, r resource.Resource,
8688 output .State = state
8789 output .Error = ""
8890
89- jobs , err := dd .getFlinkExceptions (ctx , kubeOut .Configs , rc .Namespace , rc .Name )
91+ exc , err := dd .getFlinkExceptions (ctx , kubeOut .Configs , rc .Namespace , rc .Name )
9092 if err != nil {
91- output .Exceptions = []Exceptions {}
93+ output .Exceptions = Exception {
94+ RootException : "Failed to fetch exceptions: " + err .Error (),
95+ }
96+ }
97+ if exc .RootException != "" {
98+ output .Exceptions = exc
9299 }
93- output .Exceptions = jobs
94100
95101 return modules .MustJSON (output ), nil
96102}
@@ -109,34 +115,37 @@ func (dd *daggerDriver) getKubeResources(ctx context.Context, configs kube.Confi
109115 return pods , crd , nil
110116}
111117
112- func (dd * daggerDriver ) getFlinkExceptions (ctx context.Context , configs kube.Config , namespace , name string ) ([] Exceptions , error ) {
118+ func (dd * daggerDriver ) getFlinkExceptions (ctx context.Context , configs kube.Config , namespace , name string ) (Exception , error ) {
113119 jobsResponseRaw , err := dd .kubeProxyService (ctx , configs , namespace , flinkRestScheme , name + flinkRestServiceNameSuffix , flinkRestServicePort , flinkRestListJobsPath )
114120 if err != nil {
115- return nil , err
121+ return Exception {} , err
116122 }
117123
118124 var jobsOverview JobsOverviewResponse
119125 if err := json .Unmarshal (jobsResponseRaw , & jobsOverview ); err != nil {
120- return nil , errors .ErrInternal .WithMsgf ("failed to unmarshal jobs overview response" ).WithCausef ("%s" , err .Error ())
126+ return Exception {} , errors .ErrInternal .WithMsgf ("failed to unmarshal jobs overview response" ).WithCausef ("%s" , err .Error ())
121127 }
122128
123- jobsExceptions := make ([]Exceptions , len (jobsOverview .Jobs ))
124- for _ , job := range jobsOverview .Jobs {
125- exceptionPath := fmt .Sprintf (flinkRestExceptionPath , job .JobID )
129+ var jobException Exception
130+ if len (jobsOverview .Jobs ) > 0 {
131+ jobId := jobsOverview .Jobs [0 ].JobID
132+ exceptionPath := fmt .Sprintf (flinkRestExceptionPath , jobId )
126133 exceptionResponseRaw , err := dd .kubeProxyService (ctx , configs , namespace , flinkRestScheme , name + flinkRestServiceNameSuffix , flinkRestServicePort , exceptionPath )
127134 if err != nil {
128- return nil , err
135+ return Exception {}, err
136+ }
137+
138+ var JobsExceptionResponse JobsExceptionResponse
139+ if err := json .Unmarshal (exceptionResponseRaw , & JobsExceptionResponse ); err != nil {
140+ return Exception {}, errors .ErrInternal .WithMsgf ("failed to unmarshal jobs exception response for job %s" , jobId ).WithCausef ("%s" , err .Error ())
129141 }
130142
131- var jobException JobsExceptionResponse
132- if err := json .Unmarshal (exceptionResponseRaw , & jobException ); err != nil {
133- return nil , errors .ErrInternal .WithMsgf ("failed to unmarshal jobs exception response for job %s" , job .JobID ).WithCausef ("%s" , err .Error ())
143+ time := time .Unix (JobsExceptionResponse .Timestamp / 1000 , (JobsExceptionResponse .Timestamp % 1000 )* int64 (time .Millisecond ))
144+ jobException = Exception {
145+ RootException : JobsExceptionResponse .RootException ,
146+ Timestamp : & time ,
134147 }
135- jobsExceptions = append (jobsExceptions , Exceptions {
136- JobID : job .JobID ,
137- RootException : jobException .RootException ,
138- })
139148 }
140149
141- return jobsExceptions , nil
150+ return jobException , nil
142151}
0 commit comments