Azure Functions + Azure Batchå®ç°MP3é³é¢è½¬ç æ¹æ¡
客æ·éæ±
客æ·çç¯å¢æ¯ä¸ä¸ªç½ç»é³ä¹ææ¾ç³»ç»ï¼æ ¹æ®ç½ç»æ 嵿ä¾ç»ææºç¨æ·æ¶å¬åç§ç ççMP3ææ²ï¼å¨å®¢æ·æ²¡è´ä¹°ææ²çæ åµä¸æä¾ä¸ä¸ªä¸åç§å çè¯å¬çæ¬ãè¿æ ·ä¸ä¸ªç³»ç»é常æç¡®å°ä¸ä¸ªéæ±å°±æ¯ä¼å®æéè¦å°ä¸æ¹ä»é³ä¹çåæä¸è·åå°ç髿¯ç¹çé³ä¹æä»¶è½¬æ¢æåç§ä½ç ççMP3æä»¶åè¯å¬æä»¶ï¼ç±äºæ¶å°çåçæä»¶æ°éåæ¶é´é½ä¸ç¡®å®ï¼æä»¥é¿æé¨ç½²å¤§éçè½¬ç æå¡å¨ä¸ºç³»ç»æä¾è½¬ç æå¡æ¾ç¶éå¸¸æµªè´¹èµæºï¼ä½æ¯å¦æä¸åå¤å¥½è¶³å¤çè½¬ç æå¡å¨çè¯ï¼å½å¤§æ¹éæä»¶éè¦è½¬ç æ¶å没æ³è½å¤å¿«é宿任å¡ï¼å¨ç°å¨è¿ä¸ªæ¶é´æ¯é鱿´å éè¦çäºèç½æ¶ä»£æ¾ç¶æ¯ä¸å¯æ¥åçãè¿æ¶åéæ©å ¬æäºè¿æ ·é«å¼¹æ§ãæé计费ç计ç®å¹³å°å°±æ¾å¾é常åéäºã
ææ¯éå
使ç¨Azure Fuctions+Azure Batch+Azure Blob Storageæ¹æ¡ï¼å ¨é¨é½æ¯åºäºPaaSå¹³å°ï¼æ é对æå¡å¨è¿è¡ç®¡çï¼ç廿å¡å¨å¨æ¥å¸¸ç»´æ¤ä¸åç§è¡¥ä¸å®å ¨ç®¡çè¦æ±ã
æ¹æ¡æ¶æå¾ï¼
æ¹æ¡å®ç°ï¼
å©ç¨Azure Functionçæ§Blobæä»¶ååï¼Azure Functionsçä¸å¤§ä¼ç¹å°±æ¯æä¾äºä¸åç±»åç触åå¨ï¼http Trigger,Blob Trigger,Timer Trigger,Queue Triggerâ¦ï¼ï¼è¿éæä»¬æ£å¥½å©ç¨ä¸Blob Triggerç¨æ¥çæ§Blobæä»¶çååã
é¦å æ¯å建ä¸ä¸ªAzure FunctionsçProject
ç¶åæå®Functionæ¯ç¨Blob Triggerçã
å建ListeningBlob彿°ï¼
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
namespace MS.CSU.mp3encoder { public static class ListeningBlob {
static string key_Convert = Environment.GetEnvironmentVariable("KeyConvert") ?? "-i \"{0}\" -codec:a libmp3lame -b:a {1} \"{2}\" -y";
static string work_Dir = Path.GetTempPath();
static string targetStorageConnection = Environment.GetEnvironmentVariable("targetStorageConnection"); static string sourceStorageConnection = Environment.GetEnvironmentVariable("sourceStorageConnection"); static string bitRates = Environment.GetEnvironmentVariable("bitRates") ?? "192k;128k;64k"; static string keyPreview = Environment.GetEnvironmentVariable("keyPreview") ?? "-ss 0 -t 29 -i \"{0}\" \"{1}\""; static CloudBlobClient blobOutputClient; static string blobOutputContainerName = Environment.GetEnvironmentVariable("outputContainer") ?? "output"; static CloudBlobContainer blobOutputContainer; static CloudBlobClient blobInputClient; static CloudBlobContainer blobInputContainer; [FunctionName("ListeningBlob")] [return: Queue("jobs")]
public static void Run([BlobTrigger("source/{name}", Connection = "sourceStorageConnection")]Stream myBlob, string name, Uri uri, TraceWriter log) { AzureBatch batch = new AzureBatch(sourceStorageConnection); //ä¿è¯æ¯ä¸ªé³é¢æä»¶é½æèªå·±çå¤çæä»¶å¤¹ï¼é¿å å²çª Guid jobId = Guid.NewGuid(); log.Info($"Job:{jobId},C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes,Path:{uri.ToString()}"); //å°æºBlobåªåå°TargetBlob,å°æºæä»¶ç§»åºçæ§Blob容å¨ï¼é¿å 误触å try { initBlobClient(); CloudBlockBlob sourceBlob = blobInputContainer.GetBlockBlobReference($"{name}"); name = Path.GetFileNameWithoutExtension(name); CloudBlockBlob targetBlob = blobOutputContainer.GetBlockBlobReference($"{name}_{jobId}/{name}.mp3"); targetBlob.StartCopy(sourceBlob); sourceBlob.Delete(); uri = targetBlob.Uri; } catch (Exception err) { log.Error($"å 餿ºBlobé误ï¼Err:{err}"); return ; } List<EncodeJob> jobs = new List<EncodeJob>(); string url = Uri.EscapeUriString(uri.ToString()); log.Info($"éè¦è½¬æ¢çç ç:{bitRates}"); string[] bitsRateNames = bitRates.Split(';'); Dictionary<string, bool> status = new Dictionary<string, bool>(); foreach (var s in bitsRateNames) { if (string.IsNullOrWhiteSpace(s)) continue; var job = new EncodeJob() { OutputName = $"{name}{s}.mp3", Name = name, Command = string.Format(key_Convert, name, s, $"{name}{s}.mp3"), id = jobId, InputUri = uri }; batch.QueueTask(job); } var previewJob = new EncodeJob() { Name = name, OutputName = $"{name}preview.mp3", Command = string.Format(keyPreview, name, $"{name}preview.mp3"), InputUri = uri, id = jobId, }; batch.QueueTask(previewJob); //Directory.Delete($"{work_Dir}\\{jobId}",true); } static void initBlobClient() { CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(targetStorageConnection); // Create a blob client for interacting with the blob service. blobOutputClient = storageOutputAccount.CreateCloudBlobClient(); blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName); blobOutputContainer.CreateIfNotExists(); //åå§åè¾å ¥çStorageå®¹å¨ CloudStorageAccount storageInputAccount = CloudStorageAccount.Parse(sourceStorageConnection); // Create a blob client for interacting with the blob service. blobInputClient = storageInputAccount.CreateCloudBlobClient(); blobInputContainer = blobInputClient.GetContainerReference("source"); } } }
å建Batchæå¡è´¦å·ï¼å¹¶ä¸è·åBatch Accountçç¸å ³ä¿¡æ¯ã
å°https://ffmpeg.zeranoe.com/ä¸è½½ææ°çffmpegç¨åºï¼å®è£ åå°ffmpeg.exeåç¬å缩æzipæä»¶ï¼ç¶åä¸ä¼ å°Batchä¸ï¼ä¸ºç¨åºè°ç¨ååå¤ï¼
æå»ºAzure Batchç±»ç¨äºè°ç¨Azure Batchè¿è¡ffmpegè¿è¡è½¬æ¢
using Microsoft.Azure.Batch;
using Microsoft.Azure.Batch.Auth;
using Microsoft.Azure.Batch.Common;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.IO;using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MS.CSU.mp3encoder {
public class AzureBatch { //ffmpegç¸å ³ä¿¡æ¯ï¼ string env_appPackageInfo = Environment.GetEnvironmentVariable("ffmpegversion")??"ffmpeg 3.4"; string appPackageId = "ffmpeg"; string appPackageVersion = "3.4"; // Pool and Job constants private const string PoolId = "WinFFmpegPool"; private const int DedicatedNodeCount = 0; private const int LowPriorityNodeCount = 5; //æå®æ§è¡è½¬ç ä»»å¡çVMæºå private const string PoolVMSize = "Standard_F2"; private const string JobName = "WinFFmpegJob"; string outputStorageConnection; string outputContainerName = "output"; string batchAccount = Environment.GetEnvironmentVariable("batchAccount"); string batchKey = Environment.GetEnvironmentVariable("batchKey"); string batchAccountUrl = Environment.GetEnvironmentVariable("batchAccountUrl"); string strMaxTaskPerNode = Environment.GetEnvironmentVariable("MaxTaskPerNode") ?? "4"; //设置æ¯ä¸ªè®¡ç®èç¹è½åæ¶å¤çç任塿°éï¼å¯æ ¹æ®éæ©çVMç±»ååä»»å¡ç±»åéå½è°æ´ int maxTaskPerNode=4; public AzureBatch(string storageConnection) { outputStorageConnection = storageConnection; } //ç¨äºåå æµè¯æ¶å建Batch对象 public AzureBatch(string storageConnection, string _batchAccount, string _batchAccountUrl, string _batchKey) { outputStorageConnection = storageConnection; batchAccount = _batchAccount; batchAccountUrl = _batchAccountUrl; batchKey = _batchKey; maxTaskPerNode = int.TryParse(strMaxTaskPerNode, out maxTaskPerNode) ? maxTaskPerNode : 4; appPackageId = env_appPackageInfo.Split(' ')[0] ?? "ffmpeg"; appPackageVersion = env_appPackageInfo.Split(' ')[1] ?? "3.4"; } /// <summary> /// Returns a shared access signature (SAS) URL providing the specified /// permissions to the specified container. The SAS URL provided is valid for 2 hours from /// the time this method is called. The container must already exist in Azure Storage. /// </summary> /// <param name="blobClient">A <see cref="CloudBlobClient"/>.</param> /// <param name="containerName">The name of the container for which a SAS URL will be obtained.</param> /// <param name="permissions">The permissions granted by the SAS URL.</param> /// <returns>A SAS URL providing the specified access to the container.</returns> private string GetContainerSasUrl(CloudBlobClient blobClient, string containerName, SharedAccessBlobPermissions permissions) { // Set the expiry time and permissions for the container access signature. In this case, no start time is specified, // so the shared access signature becomes valid immediately. Expiration is in 2 hours. SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy { SharedAccessExpiryTime = DateTime.UtcNow.AddHours(2), Permissions = permissions }; // Generate the shared access signature on the container, setting the constraints directly on the signature CloudBlobContainer container = blobClient.GetContainerReference(containerName); string sasContainerToken = container.GetSharedAccessSignature(sasConstraints); // Return the URL string for the container, including the SAS token return String.Format("{0}{1}", container.Uri, sasContainerToken); } // BATCH CLIENT OPERATIONS - FUNCTION IMPLEMENTATIONS /// <summary> /// Creates the Batch pool. /// </summary> /// <param name="batchClient">A BatchClient object</param> /// <param name="poolId">ID of the CloudPool object to create.</param> private void CreatePoolIfNotExist(BatchClient batchClient, string poolId) { // if (batchClient.PoolOperations.GetPool(poolId) != null) // { // return; // } CloudPool pool = null; try { ImageReference imageReference = new ImageReference( publisher: "MicrosoftWindowsServer", offer: "WindowsServer", sku: "2012-R2-Datacenter-smalldisk", version: "latest"); //ImageReference imageReference = new ImageReference( // publisher: "MicrosoftWindowsServer", // offer: "WindowsServer", // sku: "2016-Datacenter-samlldisk", // version: "latest"); VirtualMachineConfiguration virtualMachineConfiguration = new VirtualMachineConfiguration( imageReference: imageReference, nodeAgentSkuId: "batch.node.windows amd64"); // Create an unbound pool. No pool is actually created in the Batch service until we call // CloudPool.Commit(). This CloudPool instance is therefore considered "unbound," and we can // modify its properties. pool = batchClient.PoolOperations.CreatePool( poolId: poolId, targetDedicatedComputeNodes: DedicatedNodeCount, targetLowPriorityComputeNodes: LowPriorityNodeCount, virtualMachineSize: PoolVMSize, virtualMachineConfiguration: virtualMachineConfiguration); pool.MaxTasksPerComputeNode = maxTaskPerNode; // Specify the application and version to install on the compute nodes // This assumes that a Windows 64-bit zipfile of ffmpeg has been added to Batch account // with Application Id of "ffmpeg" and Version of "3.4". // Download the zipfile https://ffmpeg.zeranoe.com/builds/win64/static/ffmpeg-3.4-win64-static.zip // to upload as application package pool.ApplicationPackageReferences = new List<ApplicationPackageReference> { new ApplicationPackageReference { ApplicationId = appPackageId, Version = appPackageVersion } }; pool.Commit(); } catch (BatchException be) { // Accept the specific error code PoolExists as that is expected if the pool already exists if (be.RequestInformation?.BatchError?.Code == BatchErrorCodeStrings.PoolExists) { // Console.WriteLine("The pool {0} already existed when we tried to create it", poolId); } else { throw; // Any other exception is unexpected } } } /// <summary> /// Creates a job in the specified pool. /// </summary> /// <param name="batchClient">A BatchClient object.</param> /// <param name="jobId">ID of the job to create.</param> /// <param name="poolId">ID of the CloudPool object in which to create the job.</param> private void CreateJobIfNotExist(BatchClient batchClient, string jobId, string poolId) { //if (batchClient.JobOperations.GetJob(jobId) != null) // return; try { Console.WriteLine("Creating job [{0}]...", jobId); CloudJob job = batchClient.JobOperations.CreateJob(); job.Id = $"{JobName}"; job.PoolInformation = new PoolInformation { PoolId = poolId }; job.Commit(); } catch (BatchException be) { // Accept the specific error code JobExists as that is expected if the job already exists if (be.RequestInformation?.BatchError?.Code == BatchErrorCodeStrings.JobExists) { Console.WriteLine("The job {0} already existed when we tried to create it", jobId); } else { throw; // Any other exception is unexpected } } } /// <summary> /// /// </summary>Creates tasks to process each of the specified input files, and submits them /// to the specified job for execution. /// <param name="batchClient">A BatchClient object.</param> /// <param name="jobId">ID of the job to which the tasks are added.</param> /// <param name="inputFiles">A collection of ResourceFile objects representing the input file /// to be processed by the tasks executed on the compute nodes.</param> /// <param name="outputContainerSasUrl">The shared access signature URL for the Azure /// Storagecontainer that will hold the output files that the tasks create.</param> /// <returns>A collection of the submitted cloud tasks.</returns> private List<CloudTask> AddTasks(BatchClient batchClient,EncodeJob job, string outputContainerSasUrl) { // Create a collection to hold the tasks added to the job: List<CloudTask> tasks = new List<CloudTask>(); // Assign a task ID for each iteration var taskId = String.Format("Task{0}", Guid.NewGuid()); // Define task command line to convert the video format from MP4 to MP3 using ffmpeg. // Note that ffmpeg syntax specifies the format as the file extension of the input file // and the output file respectively. In this case inputs are MP4. string appPath = String.Format("%AZ_BATCH_APP_PACKAGE_{0}#{1}%", appPackageId, appPackageVersion); string inputMediaFile = job.Name; string outputMediaFile = job.OutputName; string taskCommandLine = String.Format("cmd /c {0}\\ffmpeg.exe {1}", appPath, job.Command); // Create a cloud task (with the task ID and command line) and add it to the task list CloudTask task = new CloudTask(taskId, taskCommandLine); task.ApplicationPackageReferences = new List<ApplicationPackageReference> { new ApplicationPackageReference { ApplicationId = appPackageId, Version = appPackageVersion } }; task.ResourceFiles = new List<ResourceFile>(); task.ResourceFiles.Add(new ResourceFile(Uri.EscapeUriString(job.InputUri.ToString()), inputMediaFile)); // Task output file will be uploaded to the output container in Storage. List<OutputFile> outputFileList = new List<OutputFile>(); OutputFileBlobContainerDestination outputContainer = new OutputFileBlobContainerDestination(outputContainerSasUrl,$"{job.Name}_{job.id}/{job.OutputName}"); OutputFile outputFile = new OutputFile(outputMediaFile,
new OutputFileDestination(outputContainer),
new OutputFileUploadOptions(OutputFileUploadCondition.TaskSuccess)); outputFileList.Add(outputFile); task.OutputFiles = outputFileList; tasks.Add(task); // Call BatchClient.JobOperations.AddTask() to add the tasks as a collection rather than making a // separate call for each. Bulk task submission helps to ensure efficient underlying API // calls to the Batch service. batchClient.JobOperations.AddTask($"{JobName}", tasks); return tasks; }
private CloudBlobClient initBlobClient() { CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(outputStorageConnection); // Create a blob client for interacting with the blob service. var blobOutputClient = storageOutputAccount.CreateCloudBlobClient(); return blobOutputClient; //blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName); //blobOutputContainer.CreateIfNotExists(); }
public void QueueTask(EncodeJob job) { BatchSharedKeyCredentials sharedKeyCredentials = new BatchSharedKeyCredentials(batchAccountUrl, batchAccount, batchKey);
var blobClient = initBlobClient();
var outputContainerSasUrl = GetContainerSasUrl(blobClient, outputContainerName, SharedAccessBlobPermissions.Write); using (BatchClient batchClient = BatchClient.Open(sharedKeyCredentials)) { // Create the Batch pool, which contains the compute nodes that execute the tasks. CreatePoolIfNotExist(batchClient, PoolId); // Create the job that runs the tasks. CreateJobIfNotExist(batchClient, $"{JobName}", PoolId); // Create a collection of tasks and add them to the Batch job. // Provide a shared access signature for the tasks so that they can upload their output // to the Storage container. AddTasks(batchClient,job,outputContainerSasUrl); } }
public async Task<Tuple<string,int>> GetStatus() { BatchSharedKeyCredentials sharedKeyCredentials = new BatchSharedKeyCredentials(batchAccountUrl, batchAccount, batchKey);
string result = "æ£å¨è·åä»»å¡ä¿¡æ¯..."; int total = 0;
using (BatchClient batchClient = BatchClient.Open(sharedKeyCredentials)) { var counts =await batchClient.JobOperations.GetJobTaskCountsAsync(JobName); total = counts.Active + counts.Running + counts.Completed; result = $"æ»ä»»å¡ï¼{total},çå¾ çä»»å¡ï¼{counts.Active},è¿è¡ä¸çä»»å¡:{counts.Running},æåçä»»å¡ï¼{counts.Succeeded},失败çä»»å¡:{counts.Failed}"; }
return new Tuple<string,int>(result,total); } } }
ç±äºAzure Functionsçæå¤§Timeoutæ¶é´ä¸º10åéï¼å½æ§è¡ä¸äºå¤§åçæä»¶è½¬æ¢æ¶å¦ææ¯åæ¥æ§è¡å¾å¾ä¼å¯¼è´è¶ æ¶éè¯¯ï¼æä»¥æä»¬éè¦å¨è°ç¨å®Batchçä»»å¡åå³å¯è¿åï¼è®©Batch Taskåå°æ§è¡ã为äºçæ§è¿äºTaskç宿ç¶åµï¼æä»¬éè¦æå»ºä¸ä¸ªå®æ¶çFunctionsæ¥æ£æ¥ä»»å¡ç¶æãç¶åå°è·åå°çç¶æä¿¡æ¯åå°output Blob Containerçstatus.htmlä¸å°±å¥½äº
using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace MS.CSU.mp3encoder { /// <summary> /// ç¨äºæ´æ°ä»»å¡å¤çç¶æ /// </summary> public static class StatusUpdate {
static int lastTotal=0;
static DateTime lastSubmitTime;
static string targetStorageConnection = Environment.GetEnvironmentVariable("targetStorageConnection"); static CloudBlobClient blobOutputClient;
static string blobOutputContainerName = Environment.GetEnvironmentVariable("outputContainer") ?? "output"; static CloudBlobContainer blobOutputContainer; [FunctionName("StatusUpdate")]
public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, TraceWriter log) { string strStatus = ""; int jobCount = 0; try { AzureBatch batch = new AzureBatch(targetStorageConnection); var result=await batch.GetStatus(); strStatus = result.Item1; jobCount = result.Item2 - lastTotal; if (lastTotal != result.Item2) { lastTotal = result.Item2; lastSubmitTime = DateTime.Now; } } catch (Exception err) { strStatus = Uri.EscapeDataString(err.ToString()); }; initBlobContainer();
var statusBlob = blobOutputContainer.GetBlockBlobReference("status.html"); string htmlStatus =$@"<html> <head> <meta http-equiv=""refresh"" content=""5""> < meta charset=""utf-8""> </head> <body> <h1>{strStatus}</h1><br/> <h1>æåæ´æ° ï¼{DateTime.Now.AddHours(8)}</h1> <h1>䏿¬¡ä»»å¡æäº¤æ¶é´:{lastSubmitTime.AddHours(8)}<h1> <h2>䏿¬¡ä»»å¡æåäºåéå æäº¤äº{jobCount}<h2> </body> </html>"; await statusBlob.UploadTextAsync(htmlStatus); } private static void initBlobContainer() { CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(targetStorageConnection); // Create a blob client for interacting with the blob service. blobOutputClient = storageOutputAccount.CreateCloudBlobClient(); blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName); blobOutputContainer.CreateIfNotExists(); } } }
æç»ææï¼
å¼ç¨èµæï¼
Azure Blob storage bindings for Azure Functions
Timer trigger for Azure Functions
Azure Batch .NET File Processing with ffmpeg
FFmpeg MP3 Encoding Guide
åæå°å:http://www.cnblogs.com/wing-ms/p/8423221.html
.NETç¤¾åºæ°é»ï¼æ·±åº¦å¥½æï¼æ¬¢è¿è®¿é®å ¬ä¼å·æç« æ±æ» http://www.csharpkit.com