从Threading.Timer启动的任务

从Threading.Timer启动的任务

问题描述:

*编辑:请参阅下面的解答我的答案。从Threading.Timer启动的任务

以下是否有危险?我试图追查我认为可能是一种竞争条件。我想我会从这里开始并从那里开始。

private BlockingCollection<MyTaskType>_MainQ = new BlockingCollection<MyTaskType>(); 
private void Start() 
{ 
    _CheckTask = new Timer(new TimerCallback(CheckTasks), null, 10, 5000); 
} 

private void CheckTasks(object state) 
{ 
    _CheckTask.Change(Timeout.Infinite, Timeout.Infinite); 
    GetTask(); 
    _CheckTask.Change(5000,5000); 
} 

private void GetTask() 
{ 
    //get task from database to object 
    Task.Factory.StartNew(delegate { 
    AddToWorkQueue(); //this adds to _MainQ which is a BlockingCollection 
    }); 
} 

private void AddToWorkQueue() 
{ 
    //do some stuff to get stuff to move 
    _MainQ.Add(dataobject); 
} 

编辑:我也使用静态类来处理写入数据库。每个调用都应该有自己独特的数据,从多个线程调用,因此它不共享数据。你认为这可能是争议的来源吗?下面

代码:

public static void ExecuteNonQuery(string connectionString, string sql, CommandType commandType, List<FastSqlParam> paramCollection = null, int timeout = 60) 
{ 
    //Console.WriteLine("{0} [Thread {1}] called ExecuteNonQuery", DateTime.Now.ToString("HH:mm:ss:ffffff"), System.Threading.Thread.CurrentThread.ManagedThreadId); 
    using (SqlConnection connection = new SqlConnection(connectionString)) 
    using (SqlCommand command = new SqlCommand(sql, connection)) 
    { 
    try 
    { 
     if (paramCollection != null) 
     { 
     foreach (FastSqlParam fsqlParam in paramCollection) 
     { 
      try 
      { 
      SqlParameter param = new SqlParameter(); 
      param.Direction = fsqlParam.ParamDirection; 
      param.Value = fsqlParam.ParamValue; 
      param.ParameterName = fsqlParam.ParamName; 
      param.SqlDbType = fsqlParam.ParamType; 
      command.Parameters.Add(param); 
      } 
      catch (ArgumentNullException anx) 
      { 
      throw new Exception("Parameter value was null", anx); 
      } 
      catch (InvalidCastException icx) 
      { 
      throw new Exception("Could not cast parameter value", icx); 
      } 
     } 
     } 

     connection.Open(); 
     command.CommandType = commandType; 
     command.CommandTimeout = timeout; 
     command.ExecuteNonQuery(); 

     if (paramCollection != null) 
     { 
     foreach (FastSqlParam fsqlParam in paramCollection) 
     { 
      if (fsqlParam.ParamDirection == ParameterDirection.InputOutput || fsqlParam.ParamDirection == ParameterDirection.Output) 
      try 
      { 
       fsqlParam.ParamValue = command.Parameters[fsqlParam.ParamName].Value; 
      } 
      catch (ArgumentNullException anx) 
      { 
       throw new Exception("Output parameter value was null", anx); 
      } 
      catch (InvalidCastException icx) 
      { 
       throw new Exception("Could not cast parameter value", icx); 
      } 
     } 
     } 
    } 
    catch (SqlException ex) 
    { 
     throw ex; 
    } 
    catch (ArgumentException ex) 
    { 
     throw ex; 
    } 
    } 
} 

每个请求:

FastSql.ExecuteNonQuery(connectionString, "someProc", System.Data.CommandType.StoredProcedure, new List<FastSqlParam>() { new FastSqlParam(SqlDbType.Int, "@SomeParam", variable)}); 

同时,我想指出,这个代码似乎从VS2010 [调试或发行]随机运行它失败。当我执行发布版本时,在将托管它的开发服务器上运行安装程序,应用程序未能崩溃并且运行平稳。

每个请求:

当前线程架构:

  1. 线程从数据库调度表
  2. 线程A的读出1结果,如果返回的行,启动一个Task登录到资源来查看是否有要传输的文件。该任务引用的对象包含使用静态调用创建的DataTable中的数据。基本如下。
  3. 如果有找到的文件,Task增加_MainQ要移动的文件

    //Called from Thread A 
    void ProcessTask() 
    { 
        var parameters = new List<FastSqlParam>() { new FastSqlParam(SqlDbType.Int, "@SomeParam", variable) }; 
        using (DataTable someTable = FastSql.ExecuteDataTable(connectionString, "someProc", CommandType.StoredProcedure, parameters)) 
        { 
         SomeTask task = new Task(); 
    
          //assign task some data from dt.Rows[0] 
    
          if (task != null) 
          { 
           Task.Factory.StartNew(delegate { AddFilesToQueue(task); }); 
          } 
         } 
        } 
    
    void AddFilesToQueue(Task task) 
    { 
        //connect to remote system and build collection of files to WorkItem 
        //e.g, WorkItem will have a collection of collections to transfer. We control this throttling mechanism to allow more threads to split up the work 
        _MainQ.Add(WorkItem); 
    } 
    

你觉得有可能是返回从FastSql.ExecuteDataTable值的问题,因为它是一个静态类,然后使用它与using块?

+0

你能发表一段代码,在队列中调用'Take'吗?另外,显示何时调用ExecuteNonQuery也会很有用。 – 2010-07-23 14:30:00

+0

对不起,我不是很清楚。我真的很想看看在哪个线程上执行ExecuteNonQuery的代码。我假设它是调用'Take'的线程。如果是这种情况,那么你至少有3个线程在这里玩1)调用'Add' 2)定时器回调本身3)和一个调用'Take'。全面了解线程如何交互是识别任何可能竞争条件的唯一可靠方法。 – 2010-07-23 16:22:05

+0

不,我不认为它与返回'FastSql.ExecuteDataTable'是'using'的目标有关。我仍然看不到'BlockingCollection.Take'方法被调用的地方。 – 2010-07-23 20:13:29

事实证明,这个问题是一个非常非常奇怪的问题。

我将原来的解决方案从.NET 3.5解决方案转换为.NET 4.0解决方案。当我在全新的.NET 4中重新创建整个解决方案时,锁定问题就消失了。0解决方案。没有引入其他更改,所以我非常有信心问题是升级到4.0。

我个人很在意在相当多的地方引入额外的线程 - “这里是龙”是一个有用的规则,当涉及到线程的工作!我看不出你有什么问题,但如果它更简单,更确定会更容易。我假设你想让“AddToWorkQueue”的调用在另一个线程中完成(以测试竞争条件),所以我已经把它留下了。

这样做你需要它吗? (眼球汇编可能是错的)


while(true) { 
    Task.Factory.StartNew(delegate { AddToWorkQueue(); }); 
    Thread.Sleep(5000); 
} 

随机放在一旁 - 比较喜欢“扔”; “扔掉”; - 前者保留原始的调用栈,后者只会给你“throw ex”的行号;呼叫。更好的是,在这种情况下,应该省略try/catch,因为你所做的只是重新抛出异常,所以你最好还是自己节省尝试的开销。

+0

是的,这基本上是真的。 我宁愿使用Threading.Timer回调,因为它在CPU上比while(true){sleep}更高效更容易。这里有几条线索提到了这一点。 – 2010-07-22 22:36:12