Skip to content

Commit 7785583

Browse files
committed
updatesubtask
1 parent 3095156 commit 7785583

File tree

12 files changed

+359
-288
lines changed

12 files changed

+359
-288
lines changed

gen/dart/lib/services/tasks_svc/v1/task_svc.pb.dart

-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/dart/lib/services/tasks_svc/v1/task_svc.pbjson.dart

+1-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/go/services/tasks_svc/v1/task_svc.pb.go

+173-185
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/ts/services/tasks_svc/v1/task_svc_pb.d.ts

-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/ts/services/tasks_svc/v1/task_svc_pb.js

+1-49
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/services/tasks_svc/v1/task_svc.proto

-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ message AssignTaskResponse {
219219
message UnassignTaskRequest {
220220
string task_id = 1; // @gotags: validate:"uuid4"
221221
string user_id = 2; // @gotags: validate:"uuid4"
222-
optional string consistency = 3; // no conflict detection, if not provided
223222
}
224223

225224
message UnassignTaskResponse {

services/tasks-svc/internal/task/aggregate/aggregate.go

+12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
pb "gen/services/tasks_svc/v1"
77
"github.com/google/uuid"
88
"hwes"
9+
"hwutil"
910
taskEventsV1 "tasks-svc/internal/task/events/v1"
1011
"tasks-svc/internal/task/models"
1112
"time"
@@ -50,6 +51,17 @@ func LoadTaskAggregateWithSnapshotAt(ctx context.Context, as hwes.AggregateStore
5051
}
5152

5253
task := *taskAggregate.Task // deref copies model
54+
55+
// also copy pointer values
56+
if task.DueAt != nil {
57+
task.DueAt = hwutil.PtrTo(*task.DueAt)
58+
}
59+
subtasks := make(map[uuid.UUID]models.Subtask)
60+
for key, value := range task.Subtasks {
61+
subtasks[key] = value
62+
}
63+
task.Subtasks = subtasks
64+
5365
snapshot = &task
5466
}
5567

services/tasks-svc/internal/task/api/grpc.go

+62-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
pb "gen/services/tasks_svc/v1"
99
"github.com/google/uuid"
1010
zlog "github.com/rs/zerolog"
11-
"github.com/rs/zerolog/log"
1211
"google.golang.org/grpc/codes"
1312
"google.golang.org/grpc/status"
1413
"google.golang.org/protobuf/proto"
@@ -17,6 +16,7 @@ import (
1716
"hwes"
1817
"hwutil"
1918
"tasks-svc/internal/task/handlers"
19+
"tasks-svc/internal/task/models"
2020
"tasks-svc/internal/util"
2121
"time"
2222
)
@@ -203,7 +203,7 @@ func (s *TaskGrpcService) AssignTask(ctx context.Context, req *pb.AssignTaskRequ
203203

204204
for i := 0; true; i++ {
205205
if i > 10 {
206-
log.Warn().Msg("UpdatePatient: conflict circuit breaker triggered")
206+
zlog.Ctx(ctx).Warn().Msg("UpdatePatient: conflict circuit breaker triggered")
207207
return nil, fmt.Errorf("failed conflict resolution")
208208
}
209209

@@ -525,9 +525,66 @@ func (s *TaskGrpcService) UpdateSubtask(ctx context.Context, req *pb.UpdateSubta
525525
return nil, err
526526
}
527527

528-
consistency, err := s.handlers.Commands.V1.UpdateSubtask(ctx, taskID, subtaskID, req.Subtask.Name, req.Subtask.Done)
529-
if err != nil {
530-
return nil, err
528+
expConsistency, ok := hwutil.ParseConsistency(req.TaskConsistency)
529+
if !ok {
530+
return nil, common.UnparsableConsistencyError(ctx, "task_consistency")
531+
}
532+
533+
var consistency uint64
534+
535+
for i := 0; true; i++ {
536+
if i > 10 {
537+
zlog.Ctx(ctx).Warn().Msg("UpdateSubtask: conflict circuit breaker triggered")
538+
return nil, fmt.Errorf("failed conflict resolution")
539+
}
540+
541+
c, conflict, err := s.handlers.Commands.V1.UpdateSubtask(ctx, taskID, subtaskID, req.Subtask.Name, req.Subtask.Done, expConsistency)
542+
if err != nil {
543+
return nil, err
544+
}
545+
consistency = c
546+
547+
if conflict == nil {
548+
break
549+
}
550+
551+
conflicts := make(map[string]*commonpb.AttributeConflict)
552+
553+
var was models.Subtask
554+
if w, ok := conflict.Was.Subtasks[subtaskID]; ok {
555+
was = w
556+
} else {
557+
return nil, fmt.Errorf("subtask did not exist at expConsistency")
558+
}
559+
560+
is := conflict.Is.Subtasks[subtaskID] // handler would have failed if non-existent
561+
562+
// TODO: find a generic approach
563+
nameUpdateRequested := req.Subtask.Name != nil && *req.Subtask.Name != is.Name
564+
nameAlreadyUpdated := was.Name != is.Name
565+
if nameUpdateRequested && nameAlreadyUpdated {
566+
conflicts["name"], err = util.AttributeConflict(
567+
wrapperspb.String(is.Name),
568+
wrapperspb.String(*req.Subtask.Name),
569+
)
570+
if err != nil {
571+
return nil, err
572+
}
573+
}
574+
575+
if len(conflicts) != 0 {
576+
return &pb.UpdateSubtaskResponse{
577+
Conflict: &commonpb.Conflict{ConflictingAttributes: conflicts},
578+
TaskConsistency: strconv.FormatUint(conflict.Consistency, 10),
579+
}, nil
580+
}
581+
582+
// bool done can never cause a problem
583+
// the user expects done = B, and sets it to \neg B
584+
// so either that is the case still, or the update will do nothing anyway
585+
586+
// no conflict? retry with new consistency
587+
expConsistency = &conflict.Consistency
531588
}
532589

533590
return &pb.UpdateSubtaskResponse{

services/tasks-svc/internal/task/commands/v1/assign_task.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ type AssignTaskConflict struct {
1515
Is *models.Task
1616
}
1717

18-
type AssignTaskCommandHandler func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (common.ConsistencyToken, *AssignTaskConflict, error)
18+
type AssignTaskCommandHandler func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (common.ConsistencyToken, *UpdateTaskConflict, error)
1919

2020
func NewAssignTaskCommandHandler(as hwes.AggregateStore) AssignTaskCommandHandler {
21-
return func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (common.ConsistencyToken, *AssignTaskConflict, error) {
21+
return func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (common.ConsistencyToken, *UpdateTaskConflict, error) {
2222
task, oldState, err := aggregate.LoadTaskAggregateWithSnapshotAt(ctx, as, taskID, expConsistency)
2323
if err != nil {
2424
return 0, nil, err
2525
}
2626

2727
if expConsistency != nil && *expConsistency != task.GetVersion() {
28-
return 0, &AssignTaskConflict{
28+
return 0, &UpdateTaskConflict{
2929
Consistency: task.GetVersion(),
3030
Was: oldState,
3131
Is: task.Task,

services/tasks-svc/internal/task/commands/v1/update_subtask.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,41 @@ import (
99
"tasks-svc/internal/task/aggregate"
1010
)
1111

12-
type UpdateSubtaskCommandHandler func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool) (common.ConsistencyToken, error)
12+
type UpdateSubtaskCommandHandler func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool, expConsistency *uint64) (common.ConsistencyToken, *UpdateTaskConflict, error)
1313

1414
func NewUpdateSubtaskCommandHandler(as hwes.AggregateStore) UpdateSubtaskCommandHandler {
15-
return func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool) (common.ConsistencyToken, error) {
16-
a, err := aggregate.LoadTaskAggregate(ctx, as, taskID)
15+
return func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool, expConsistency *uint64) (common.ConsistencyToken, *UpdateTaskConflict, error) {
16+
a, oldState, err := aggregate.LoadTaskAggregateWithSnapshotAt(ctx, as, taskID, expConsistency)
1717
if err != nil {
18-
return 0, err
18+
return 0, nil, err
1919
}
2020

2121
currentSubtask, found := a.Task.Subtasks[subtaskID]
2222
if !found {
23-
return 0, fmt.Errorf("subtask with ID: %s not found on Task with ID: %s", subtaskID, taskID)
23+
return 0, nil, fmt.Errorf("subtask with ID: %s not found on Task with ID: %s", subtaskID, taskID)
24+
}
25+
26+
if expConsistency != nil && *expConsistency != a.GetVersion() {
27+
return 0, &UpdateTaskConflict{
28+
Consistency: a.GetVersion(),
29+
Was: oldState,
30+
Is: a.Task,
31+
}, err
2432
}
2533

2634
if name != nil && *name != currentSubtask.Name {
2735
if err := a.UpdateSubtaskName(ctx, subtaskID, *name); err != nil {
28-
return 0, err
36+
return 0, nil, err
2937
}
3038
}
3139

3240
if done != nil && *done != currentSubtask.Done {
3341
if err := a.UpdateSubtaskDone(ctx, subtaskID, *done); err != nil {
34-
return 0, err
42+
return 0, nil, err
3543
}
3644
}
3745

38-
return as.Save(ctx, a)
46+
consistency, err := as.Save(ctx, a)
47+
return consistency, nil, err
3948
}
4049
}

services/tasks-svc/stories/TaskCRUD_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,8 @@ func TestCreateUpdateGetTask(t *testing.T) {
201201
//
202202

203203
unassignRes, err := taskClient.UnassignTask(ctx, &pb.UnassignTaskRequest{
204-
TaskId: taskId,
205-
UserId: assignedUser.String(),
206-
Consistency: &task.Consistency,
204+
TaskId: taskId,
205+
UserId: assignedUser.String(),
207206
})
208207
assert.NoError(t, err)
209208

0 commit comments

Comments
 (0)