2017-05-24 104 views
2

我正在玩任務,並且我不知道如何在拋出異常時取消任務。我正在嘗試TaskCompletionSourcecancel,但它不起作用。取消任務正在等待

當拋出異常時,有沒有辦法完全停止任務?

public async Task Migrate() 
{ 
    var tcs = new TaskCompletionSource<bool>(); 
    if (IsDataMigratorItemParameteresCorrect()) 
    { 
     try 
     { 
      using (SqlConnection srcconnection = this.SourceConnection) 
      { 
       await srcconnection.OpenAsync(); 
       using (SqlCommand srccmd = CreateMigrateTaskCommand(this.SourceForHash, srcconnection)) 
       { 
        using (SqlDataReader srcreader = await srccmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) 
        { 
         using (SqlConnection destconnection = this.DestinationConnection) 
         { 
          await destconnection.OpenAsync(); 

          await CreateStageTable(destconnection, srcreader); 
          await BulkCopy(destconnection, srcreader); 
          await Merge(destconnection, srcreader); 
          await DropTable(destconnection); 
         } 
        } 
       } 

      } 
     } 

     catch (Exception ex) 
     { 
      Console.WriteLine(DestinationCommand + " canceled\nException : " + ex.Message); 
      tcs.SetCanceled(); 
     } 
    } 
} 

private SqlCommand CreateMigrateTaskCommand(string commandQuery, SqlConnection connection) 
{ 
    SqlCommand scmd = new SqlCommand(commandQuery, connection); 
    return scmd; 
} 

private async Task Merge(SqlConnection conn, SqlDataReader reader) 
{ 
    using (SqlCommand cmdMerge = conn.CreateCommand()) 
    { 
     string mergeQuery = SqlGenerator.BuildMergeQuery(reader, DestinationCommand, this.SourceUniqueKey, this.DestinationUniqueKey); 

     cmdMerge.CommandText = mergeQuery; 
     cmdMerge.CommandType = CommandType.Text; 
     cmdMerge.CommandTimeout = 0; 
     await cmdMerge.ExecuteNonQueryAsync(); 
     Console.WriteLine("{0} - MERGE START\n", DestinationCommand); 
    } 
    Console.WriteLine("{0} - MERGE SUCCESS!\n", DestinationCommand); 
} 

private async Task BulkCopy(SqlConnection conn, SqlDataReader reader) 
{ 
    using (SqlBulkCopy bcp = new SqlBulkCopy(conn.ConnectionString, SqlBulkCopyOptions.TableLock)) //table lock 
    { 
     bcp.SqlRowsCopied += bcp_SqlRowsCopied; 
     bcp.BatchSize = this.BatchSize; 
     bcp.EnableStreaming = true; 
     bcp.BulkCopyTimeout = 0; 
     bcp.NotifyAfter = 1000; 

     string destTable = this.DestinationCommand + "_stage"; 
     bcp.DestinationTableName = destTable; 

     await bcp.WriteToServerAsync(reader); 
    } 
} 

private async Task CreateStageTable(SqlConnection conn, SqlDataReader reader) 
{ 
    using (SqlCommand cmdCreateTable = conn.CreateCommand()) 
    { 
     try 
     { 
      string createTableQuery = SqlGenerator.BuildCreateTableQuery(reader, this.DestinationCommand); 

      cmdCreateTable.CommandText = createTableQuery; 
      cmdCreateTable.CommandType = CommandType.Text; 

      await cmdCreateTable.ExecuteNonQueryAsync(); 
      Console.WriteLine(this.DestinationCommand + " created!"); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex.Message); 
     } 
    } 
} 

private async Task DropTable(SqlConnection conn) 
{ 
    using (SqlCommand cmdCreateTable = conn.CreateCommand()) 
    { 
     try 
     { 
      string tableName = DestinationCommand + "_stage"; 
      var builder = new SqlCommandBuilder(); 
      string escapedTableName = builder.QuoteIdentifier(tableName); 
      cmdCreateTable.CommandText = "drop table " + escapedTableName; 
      Console.WriteLine(escapedTableName + " created!"); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(DestinationCommand + " canceled\nException : " + ex.Message); 
      tcs.SetCanceled(); 
     } 
    } 
} 
+0

您的代碼示例不完整。 –

+0

我的不好。現在編輯:) –

回答

4

有兩個部分,以cancellation:請求所述消除(TaskCompletionSource.SetCanceled)並響應該取消請求。

您的代碼調用SetCanceled發生異常時,如果要取消時一個有異常其他操作是確定。

但是,代碼中沒有任何內容正在偵聽取消令牌。它應該被傳遞到支持CancellationToken的API,或者定期檢查(CancellationToken.ThrowIfCancellationRequested),或者附加取消回調(CancellationToken.Register)。否則,這只是一個設置的標誌,沒有人正在檢查它。