diff --git a/mockresponses.go b/mockresponses.go index 9446ac8e5..bd2902b1d 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -115,7 +115,7 @@ func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, descrip func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader { request := reqBody.(*DescribeGroupsRequest) - response := &DescribeGroupsResponse{} + response := &DescribeGroupsResponse{Version: request.version()} for _, requestedGroup := range request.Groups { if group, ok := m.groups[requestedGroup]; ok { response.Groups = append(response.Groups, group) @@ -411,7 +411,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*ConsumerMetadataRequest) group := req.ConsumerGroup - res := &ConsumerMetadataResponse{} + res := &ConsumerMetadataResponse{Version: req.version()} v := mr.coordinators[group] switch v := v.(type) { case *MockBroker: @@ -459,8 +459,7 @@ func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*FindCoordinatorRequest) - res := &FindCoordinatorResponse{} - res.Version = req.Version + res := &FindCoordinatorResponse{Version: req.version()} var v interface{} switch req.CoordinatorType { case CoordinatorGroup: @@ -508,7 +507,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3 func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*OffsetCommitRequest) group := req.ConsumerGroup - res := &OffsetCommitResponse{} + res := &OffsetCommitResponse{Version: req.version()} for topic, partitions := range req.blocks { for partition := range partitions { res.AddError(topic, partition, mr.getError(group, topic, partition)) @@ -565,7 +564,10 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*ProduceRequest) res := &ProduceResponse{ - Version: mr.version, + Version: req.version(), + } + if mr.version > 0 { + res.Version = mr.version } for topic, partitions := range req.records { for partition := range partitions { @@ -678,7 +680,7 @@ func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse { func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteTopicsRequest) - res := &DeleteTopicsResponse{} + res := &DeleteTopicsResponse{Version: req.version()} res.TopicErrorCodes = make(map[string]KError) for _, topic := range req.Topics { @@ -703,7 +705,7 @@ func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsRespon func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreatePartitionsRequest) - res := &CreatePartitionsResponse{} + res := &CreatePartitionsResponse{Version: req.version()} res.TopicPartitionErrors = make(map[string]*TopicPartitionError) for topic := range req.TopicPartitions { @@ -731,7 +733,7 @@ func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartit func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*AlterPartitionReassignmentsRequest) _ = req - res := &AlterPartitionReassignmentsResponse{} + res := &AlterPartitionReassignmentsResponse{Version: req.version()} return res } @@ -746,7 +748,7 @@ func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitio func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*ListPartitionReassignmentsRequest) _ = req - res := &ListPartitionReassignmentsResponse{} + res := &ListPartitionReassignmentsResponse{Version: req.version()} for topic, partitions := range req.blocks { for _, partition := range partitions { @@ -767,7 +769,7 @@ func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse { func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteRecordsRequest) - res := &DeleteRecordsResponse{} + res := &DeleteRecordsResponse{Version: req.version()} res.Topics = make(map[string]*DeleteRecordsResponseTopic) for topic, deleteRecordRequestTopic := range req.Topics { @@ -913,7 +915,7 @@ func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse { func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*AlterConfigsRequest) - res := &AlterConfigsResponse{} + res := &AlterConfigsResponse{Version: req.version()} for _, r := range req.Resources { res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ @@ -935,7 +937,7 @@ func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsR func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*AlterConfigsRequest) - res := &AlterConfigsResponse{} + res := &AlterConfigsResponse{Version: req.version()} for _, r := range req.Resources { res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ @@ -958,7 +960,7 @@ func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlte func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*IncrementalAlterConfigsRequest) - res := &IncrementalAlterConfigsResponse{} + res := &IncrementalAlterConfigsResponse{Version: req.version()} for _, r := range req.Resources { res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ @@ -980,7 +982,7 @@ func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIn func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*IncrementalAlterConfigsRequest) - res := &IncrementalAlterConfigsResponse{} + res := &IncrementalAlterConfigsResponse{Version: req.version()} for _, r := range req.Resources { res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ @@ -1003,7 +1005,7 @@ func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse { func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreateAclsRequest) - res := &CreateAclsResponse{} + res := &CreateAclsResponse{Version: req.version()} for range req.AclCreations { res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError}) @@ -1021,7 +1023,7 @@ func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseE func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreateAclsRequest) - res := &CreateAclsResponse{} + res := &CreateAclsResponse{Version: req.version()} for range req.AclCreations { res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest}) @@ -1039,7 +1041,7 @@ func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse { func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DescribeAclsRequest) - res := &DescribeAclsResponse{} + res := &DescribeAclsResponse{Version: req.version()} res.Err = ErrNoError acl := &ResourceAcls{} if req.ResourceName != nil { @@ -1082,11 +1084,12 @@ func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateRespon func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*SaslAuthenticateRequest) - res := &SaslAuthenticateResponse{} - res.Version = req.Version - res.Err = msar.kerror - res.SaslAuthBytes = msar.saslAuthBytes - res.SessionLifetimeMs = msar.sessionLifetimeMs + res := &SaslAuthenticateResponse{ + Version: req.version(), + Err: msar.kerror, + SaslAuthBytes: msar.saslAuthBytes, + SessionLifetimeMs: msar.sessionLifetimeMs, + } return res } @@ -1120,7 +1123,8 @@ func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse { } func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader { - res := &SaslHandshakeResponse{} + req := reqBody.(*SaslHandshakeRequest) + res := &SaslHandshakeResponse{Version: req.version()} res.Err = mshr.kerror res.EnabledMechanisms = mshr.enabledMechanisms return res @@ -1142,7 +1146,7 @@ func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse { func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteAclsRequest) - res := &DeleteAclsResponse{} + res := &DeleteAclsResponse{Version: req.version()} for range req.Filters { response := &FilterResponse{Err: ErrNoError} @@ -1167,7 +1171,9 @@ func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDelete } func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*DeleteGroupsRequest) resp := &DeleteGroupsResponse{ + Version: req.version(), GroupErrorCodes: map[string]KError{}, } for _, group := range m.deletedGroups { @@ -1196,7 +1202,9 @@ func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic stri } func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*DeleteOffsetsRequest) resp := &DeleteOffsetsResponse{ + Version: req.version(), ErrorCode: m.errorCode, Errors: map[string]map[int32]KError{ m.topic: {m.partition: m.errorPartition}, @@ -1289,8 +1297,10 @@ func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse { } func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*LeaveGroupRequest) resp := &LeaveGroupResponse{ - Err: m.Err, + Version: req.version(), + Err: m.Err, } return resp } @@ -1394,7 +1404,9 @@ func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartiti } func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*DescribeLogDirsRequest) resp := &DescribeLogDirsResponse{ + Version: req.version(), LogDirs: m.logDirs, } return resp