Merge pull request #85 from datomia/fix-pool-leak

Fix memory leak
This commit is contained in:
Lei Xue
2019-06-30 18:57:44 +08:00
committed by GitHub
4 changed files with 41 additions and 61 deletions

View File

@@ -27,7 +27,6 @@ import (
"github.com/gostor/gotgt/pkg/config" "github.com/gostor/gotgt/pkg/config"
"github.com/gostor/gotgt/pkg/scsi" "github.com/gostor/gotgt/pkg/scsi"
"github.com/gostor/gotgt/pkg/util" "github.com/gostor/gotgt/pkg/util"
"github.com/gostor/gotgt/pkg/util/pool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@@ -283,7 +282,7 @@ func (s *ISCSITargetDriver) rxHandler(conn *iscsiConnection) {
return return
} }
dl := ((cmd.DataLen + DataPadding - 1) / DataPadding) * DataPadding dl := ((cmd.DataLen + DataPadding - 1) / DataPadding) * DataPadding
cmd.RawData = pool.NewBuffer(dl) cmd.RawData = make([]byte, int(dl))
length := 0 length := 0
for length < dl { for length < dl {
l, err := conn.readData(cmd.RawData[length:]) l, err := conn.readData(cmd.RawData[length:])
@@ -608,7 +607,7 @@ func (s *ISCSITargetDriver) scsiCommandHandler(conn *iscsiConnection) (err error
} }
scmd.OutSDBBuffer = &api.SCSIDataBuffer{ scmd.OutSDBBuffer = &api.SCSIDataBuffer{
Length: uint32(blen), Length: uint32(blen),
Buffer: pool.NewBuffer(blen), Buffer: make([]byte, blen),
} }
} }
log.Debugf("SCSI write, R2T count: %d, unsol Count: %d, offset: %d", task.r2tCount, task.unsolCount, task.offset) log.Debugf("SCSI write, R2T count: %d, unsol Count: %d, offset: %d", task.r2tCount, task.unsolCount, task.offset)
@@ -621,7 +620,9 @@ func (s *ISCSITargetDriver) scsiCommandHandler(conn *iscsiConnection) (err error
// prepare to receive more data // prepare to receive more data
conn.session.ExpCmdSN += 1 conn.session.ExpCmdSN += 1
task.state = taskPending task.state = taskPending
conn.session.PendingTasksMutex.Lock()
conn.session.PendingTasks.Push(task) conn.session.PendingTasks.Push(task)
conn.session.PendingTasksMutex.Unlock()
conn.rxTask = task conn.rxTask = task
if conn.session.SessionParam[ISCSI_PARAM_INITIAL_R2T_EN].Value == 1 { if conn.session.SessionParam[ISCSI_PARAM_INITIAL_R2T_EN].Value == 1 {
iscsiExecR2T(conn) iscsiExecR2T(conn)
@@ -636,7 +637,7 @@ func (s *ISCSITargetDriver) scsiCommandHandler(conn *iscsiConnection) (err error
} else if scmd.InSDBBuffer == nil { } else if scmd.InSDBBuffer == nil {
scmd.InSDBBuffer = &api.SCSIDataBuffer{ scmd.InSDBBuffer = &api.SCSIDataBuffer{
Length: uint32(req.ExpectedDataLen), Length: uint32(req.ExpectedDataLen),
Buffer: pool.NewBuffer(int(req.ExpectedDataLen)), Buffer: make([]byte, int(req.ExpectedDataLen)),
} }
} }
task.offset = 0 task.offset = 0
@@ -664,12 +665,9 @@ func (s *ISCSITargetDriver) scsiCommandHandler(conn *iscsiConnection) (err error
} }
case OpSCSIOut: case OpSCSIOut:
log.Debugf("iSCSI Data-out processing...") log.Debugf("iSCSI Data-out processing...")
var task *iscsiTask conn.session.PendingTasksMutex.RLock()
for _, t := range conn.session.PendingTasks { task := conn.session.PendingTasks.GetByTag(conn.req.TaskTag)
if t.tag == conn.req.TaskTag { conn.session.PendingTasksMutex.RUnlock()
task = t
}
}
if task == nil { if task == nil {
err = fmt.Errorf("Cannot find iSCSI task with tag[%v]", conn.req.TaskTag) err = fmt.Errorf("Cannot find iSCSI task with tag[%v]", conn.req.TaskTag)
log.Error(err) log.Error(err)
@@ -705,6 +703,9 @@ func (s *ISCSITargetDriver) scsiCommandHandler(conn *iscsiConnection) (err error
} else { } else {
conn.buildRespPackage(OpSCSIResp, task) conn.buildRespPackage(OpSCSIResp, task)
conn.rxTask = nil conn.rxTask = nil
conn.session.PendingTasksMutex.Lock()
conn.session.PendingTasks.RemoveByTag(conn.req.TaskTag)
conn.session.PendingTasksMutex.Unlock()
} }
case OpNoopOut: case OpNoopOut:
iscsiExecNoopOut(conn) iscsiExecNoopOut(conn)
@@ -745,11 +746,12 @@ func (s *ISCSITargetDriver) iscsiTaskQueueHandler(task *iscsiTask) error {
if err := s.iscsiExecTask(task); err != nil { if err := s.iscsiExecTask(task); err != nil {
log.Error(err) log.Error(err)
} }
if len(sess.PendingTasks) == 0 { sess.PendingTasksMutex.Lock()
if sess.PendingTasks.Len() == 0 {
sess.PendingTasksMutex.Unlock()
return nil return nil
} }
sess.PendingTasksMutex.Lock() task = sess.PendingTasks.Pop()
task = sess.PendingTasks.Pop().(*iscsiTask)
cmd = task.cmd cmd = task.cmd
if cmd.CmdSN != cmdsn { if cmd.CmdSN != cmdsn {
sess.PendingTasks.Push(task) sess.PendingTasks.Push(task)
@@ -796,15 +798,8 @@ func (s *ISCSITargetDriver) iscsiExecTask(task *iscsiTask) error {
sess := task.conn.session sess := task.conn.session
switch cmd.TaskFunc { switch cmd.TaskFunc {
case ISCSI_TM_FUNC_ABORT_TASK: case ISCSI_TM_FUNC_ABORT_TASK:
var stask *iscsiTask
sess.PendingTasksMutex.Lock() sess.PendingTasksMutex.Lock()
for i, t := range sess.PendingTasks { stask := sess.PendingTasks.RemoveByTag(cmd.ReferencedTaskTag)
if cmd.ReferencedTaskTag == t.tag {
stask = sess.PendingTasks[i]
sess.PendingTasks = append(sess.PendingTasks[:i], sess.PendingTasks[i+1:]...)
break
}
}
sess.PendingTasksMutex.Unlock() sess.PendingTasksMutex.Unlock()
if stask == nil { if stask == nil {
task.result = ISCSI_TMF_RSP_NO_TASK task.result = ISCSI_TMF_RSP_NO_TASK

View File

@@ -273,12 +273,12 @@ func (tq taskQueue) Swap(i, j int) {
tq[i], tq[j] = tq[j], tq[i] tq[i], tq[j] = tq[j], tq[i]
} }
func (tq *taskQueue) Push(x interface{}) { func (tq *taskQueue) Push(x *iscsiTask) {
item := x.(*iscsiTask) item := x
*tq = append(*tq, item) *tq = append(*tq, item)
} }
func (tq *taskQueue) Pop() interface{} { func (tq *taskQueue) Pop() *iscsiTask {
old := *tq old := *tq
n := len(old) n := len(old)
item := old[n-1] item := old[n-1]
@@ -286,6 +286,26 @@ func (tq *taskQueue) Pop() interface{} {
return item return item
} }
func (tq taskQueue) GetByTag(tag uint32) *iscsiTask {
for _, t := range tq {
if t.tag == tag {
return t
}
}
return nil
}
func (tq *taskQueue) RemoveByTag(tag uint32) *iscsiTask {
old := *tq
for i, t := range old {
if t.tag == tag {
*tq = append(old[:i], old[i+1:]...)
return t
}
}
return nil
}
func (s *ISCSITargetDriver) LookupISCSISession(tgtName string, iniName string, isid uint64, tsih uint16, tpgt uint16) *ISCSISession { func (s *ISCSITargetDriver) LookupISCSISession(tgtName string, iniName string, isid uint64, tsih uint16, tpgt uint16) *ISCSISession {
var ( var (
tgt *ISCSITarget tgt *ISCSITarget

View File

@@ -23,7 +23,6 @@ import (
"github.com/gostor/gotgt/pkg/api" "github.com/gostor/gotgt/pkg/api"
"github.com/gostor/gotgt/pkg/util" "github.com/gostor/gotgt/pkg/util"
"github.com/gostor/gotgt/pkg/util/pool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@@ -110,7 +109,7 @@ func bsPerformCommand(bs api.BackingStore, cmd *api.SCSICommand) (err error, key
// TODO // TODO
break break
case api.READ_6, api.READ_10, api.READ_12, api.READ_16: case api.READ_6, api.READ_10, api.READ_12, api.READ_16:
rbuf = pool.NewBuffer(int(tl)) rbuf = make([]byte, int(tl))
rbuf, err = bs.Read(int64(offset), tl) rbuf, err = bs.Read(int64(offset), tl)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
key = MEDIUM_ERROR key = MEDIUM_ERROR
@@ -179,7 +178,7 @@ write:
} }
verify: verify:
if doVerify { if doVerify {
rbuf = pool.NewBuffer(int(tl)) rbuf = make([]byte, int(tl))
rbuf, err = bs.Read(int64(offset), tl) rbuf, err = bs.Read(int64(offset), tl)
if err != nil { if err != nil {
key = MEDIUM_ERROR key = MEDIUM_ERROR

View File

@@ -1,34 +0,0 @@
/*
Copyright 2017 The GoStor Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package pool provides memory pool for buffer.
package pool
import "sync"
var bytePool sync.Pool = sync.Pool{}
func NewBuffer(size int) []byte {
bytePool.New = func() interface{} {
return make([]byte, size)
}
return bytePool.Get().([]byte)
}
func ReleaseBuffer(b []byte) {
bytePool.Put(b)
}