Follow Me Widget

четверг, 17 февраля 2011 г.

Разработка приложений под Windows Azure. Хранилища данных. Часть №2. Таблицы и очереди.

Добрый день. Мы продолжаем с вами знакомиться с технологиями хранения данных в Windows Azure. Сегодня поговорим о таблицах и очередях. Что это такое и с чем его едят.

Очередь.
И начнем мы с очередей. Если давать определение очереди с точки зрения программиста, то это механизм хранения данных работающий по принципу FIFO (First In First Out). Да, это довольно таки старый паттерн, но он нашел свое применение в облаке. Нашел, но несколько в ином обличии. Во-первых используя Windows Azure Queues мы не можем быть уверенными, что сообщения будут поступать и изыматься из очереди по ее основному принципу FIFO, то есть порядок не гарантируется. Именно из-за этого ограничения
облачная очередь служит для несколько иных целей, а именно для организации асинхронной системы. То есть Windows Azure Queues служит своеобразным коммуникационным каналом между различными частями одного и того же приложения, либо между совершенно разными приложениями. Например, хорошим примером может быть сценарий размещения изображений на вашем сервере, в котором пользователи могут отправлять новые изображения посредством Веб-роли, а ваша фоновая worker-роль будет создавать для них уменьшенные копии. При таком сценарии имеет смысл отправлять новые работы по созданию изображений различных масштабов фоновому процессу посредством Windows Azure Queue. Давайте ближе познакомимся с очередью. Логическая организация очереди представлена на рисунке ниже:
Queue Storage Concepts
Как мы видим, на верхнем уровне иерархии находится аккаунт, аккаунт может содержать неограниченное количество очередей, каждая из которых способна хранить неограниченное количество сообщений. Список операций, которые мы можем выполнять над очередью также довольно таки стандартный:
  • Получение списка очередей, привязанных к аккаунту
  • Создание/удаление очереди
  • Получение/установка метаданных очереди
  • Очистка очереди
  • Добавление/получение/удаление сообщений
Основными преимуществами очереди являются ее высокая отказоустойчивость и гарантия доставки/обработки сообщений. Последнее достигается за счет того, что в момент изъятия сообщения оно физически не удаляется а всего лишь становится невидимым, поэтому если произойдет какой либо сбой в обрабатывающем сообщение сервисе, то оно станет снова видимым для других по истечении определенного промежутка времени. Удаляются сообщения из очереди по отдельной соответствующей команде.
Следующая нереляционная технология, о которой я хотел бы рассказать – это хранение данных при помощи таблиц или Windows Azure Table Storage.

Таблицы.
Согласно определению, таблицы представляют собой хранилища данных, в которых эти данные организованы в сущности, а сущности в свою очередь состоят из свойств. Схематически это будет выглядеть так:
Table Storage Concepts
На верхнем уровне иерархии без изменений – это опять же наш аккаунт. Каждый аккаунт может включать  в себя неограниченное количество таблиц. Таблица содержит множество сущностей (можно провести аналогию со строкой в таблице реляционной базы данных) каждая из которых может содержать множество свойств (по аналогии с колонкой в таблице реляционной базы данных). Как мы видим концептуально некая схожесть с реляционными таблицами присутствует, но не более. Во-первых Windows Azure Tables совершенно независимы от схемы в отличии от реляционных таблиц, это означает, что в контексте одной таблицы могут присутствовать объекты с совершенно разным  набором полей-свойств. Во-вторых нет никакой ссылочной целостности, то есть джойны здесь не работают. Из других особенностей следует отметить следующие:
  • Сущность может иметь до 255 полей и ее размер не должен превышать 1 Мб
  • Сущность идентифицируется набором идентификаторов секции и строки
  • Поля могут быть стандартными .NET типами
Особый интерес вызывает способ идентификации сущностей, согласно которому уникально сущность идентифицируют 2 ключа: внешний и внутренний (PartitionKey and RowKey). Внутренний или строковый служит для идентификации сущности в пределах секции. Секционный же предоставляет вам возможность группировать определенные наборы в пределах одного хранилища. Это значит, что сущности с одинаковым секционным ключом будут располагаться на одном физическом сервере. Благодаря этому значительно повышается эффективность запросов и возможности по масштабированию отдельных таблиц. Следует заметить, что нам как разработчикам доступен довольно таки обширный функционал по манипулированию таблицами и сущностями, сюда входят стандартные CRUD операции, а также некоторые возможности по управлению табличными метаданными.

Пример взаимодействия с очередью и хранилищем блобов.
На этом я хотел бы закончить с теорией и продемонстрировать фрагменты кода по взаимодействию с хранилищем блобов и очередью. Приложение содержит в себе 2 роли – это web-роль и worker-роль.
Thumbnails application
Пользователи взаимодействуют с веб-ролью и имеют возможность загружать изображения в хранилище блобов. В свою очередь при каждой загрузке нового изображения веб-роль посылает сообщение worker-роли о наличии нового изображения. Увидев это, worker-роль автоматически создает для изображения  его уменьшенную версию и сохраняет ее все в том же хранилище блобов. Коммуникация между ролями осуществляется при помощи очереди. Процесс сохранения загруженного изображения в хранилище блобов представлен чуть ниже:

  1. // В первую очередь получаем ссылку на аккаунт, строку соединения берем из конфигурационного файла  
  2. var storageAccount = CloudStorageAccount.FromConfigurationSetting("DataConnectionString");  
  3. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();  
  4. // Дальше получаем ссылку на целевой контейнер и создаем его если он не существует  
  5. CloudBlobContainer container = blobClient.GetContainerReference("photogallery");  
  6. container.CreateIfNotExist();  
  7. // Устанавливаем уровни доступа к контейнеру, в данном случае у нас открытый контейнер  
  8. var permissions = container.GetPermissions();  
  9. permissions.PublicAccess = BlobContainerPublicAccessType.Container;  
  10. container.SetPermissions(permissions);  
  11. // Создаем клиент к сервису очередей  
  12. CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();  
  13. // Получаем ссылку на интересующую нас очередь и создаем ее если она не существует  
  14. CloudQueue queue = queueStorage.GetQueueReference("thumbnailmaker");  
  15. queue.CreateIfNotExist();  
  16. // Далее получаем ссылку на интересующий нас блоб, фукнция принимает имя блоба  
  17. var targetBlob = container.GetBlockBlobReference("BlobImage.jpg");  
  18. // и загружаем в него содержимое изображения  
  19. targetBlob.UploadFromStream(inputFile.FileContent);  
  20. // Последним действием отправляем сообщение в очередь с именем загруженного блоба для последующей обработки  
  21. queue.AddMessage(new CloudQueueMessage(System.Text.Encoding.UTF8.GetBytes("BlobImage.jpg")));  
Фоновая программа, наша worker-роль постоянно прослушивает очередь на предмет поступления новых данных. Как только данное событие наступает запускается механизм создания уменьшенной копии изображения и сохранения ее в подкаталог thumbnails хранилища блобов. Код выполняющий данные действия представлен ниже:
  1. public class WorkerRole : RoleEntryPoint  
  2. {  
  3. private Stream CreateThumbnail(Stream input)  
  4. {  
  5. // Создаем уменьшенное изображение и возвращаем его виде потока вызывающему коду  
  6. }  
  7.   
  8. public override bool OnStart()  
  9. {  
  10. // Код выполняющий инициализацию роли.   
  11. }  
  12.   
  13. public override void Run()  
  14. {  
  15. CloudBlobClient blobStorage = null ;   
  16. CloudBlobContainer blobContainer = null ;  
  17.   
  18. CloudQueueClient queueStorage = null ;   
  19. CloudQueue queue = null ;  
  20. // Получение ссылки на аккаунт  
  21. var storageAccount = CloudStorageAccount.FromConfigurationSetting("DataConnectionString");  
  22. bool containerAndQueueCreated = false;  
  23. while (!containerAndQueueCreated)  
  24. {  
  25. try  
  26. {  
  27. // Инициалируем клиент. предназначенный для взаимодействия с хранилищем блобов  
  28. blobStorage = storageAccount.CreateCloudBlobClient();  
  29. // Получаем ссылку интересующий нас контейнер  
  30. blobContainer = blobStorage.GetContainerReference("photogallery");  
  31. //  Создаем клиент для взаимодействия с очередью и получаем ссылку на конкретную очередь, которая будет содержать наши сообщения  
  32. queueStorage = storageAccount.CreateCloudQueueClient();  
  33. queue = queueStorage.GetQueueReference("thumbnailmaker");  
  34. queue.CreateIfNotExist();  
  35. containerAndQueueCreated = true;  
  36.   
  37. }  
  38. catch (StorageClientException e)  
  39. {  
  40. if (e.ErrorCode == StorageErrorCode.TransportError)  
  41. {  
  42. Trace.TraceError(string.Format("Connect failure! The most likely reason is that the local " +  
  43. "Development Storage tool is not running or your storage account configuration is incorrect. " +  
  44. "Message: '{0}'", e.Message));  
  45. System.Threading.Thread.Sleep(5000);  
  46. }  
  47. else  
  48. {  
  49. throw;  
  50. }  
  51. }  
  52. }  
  53.   
  54. Trace.TraceInformation("Listening for queue messages...");  
  55.   
  56. // Входим в бесконечный цикл ожидания поступления новых данных  
  57. while (true)  
  58. {  
  59. try  
  60. {  
  61. CloudQueueMessage msg = queue.GetMessage();  
  62. // Если из очереди получили новое сообщение то приступаем к его обработке  
  63. if (msg != null)  
  64. {  
  65. string path = msg.AsString;  
  66. string thumbnailName = System.IO.Path.GetFileNameWithoutExtension(path) + ".jpg";  
  67. Trace.TraceInformation(string.Format("Dequeued '{0}'", path));  
  68. // Сообщение содержит имя блоба, получаем ссылку на этот блоб  
  69. CloudBlockBlob content = blobContainer.GetBlockBlobReference(path);  
  70. // и сохраняем уменьшенную версию с префиксом thumbnails/  
  71. CloudBlockBlob thumbnail = blobContainer.GetBlockBlobReference("thumbnails/" + thumbnailName);  
  72. thumbnail.Properties.ContentType = "image/jpeg";  
  73. MemoryStream image = new MemoryStream();  
  74. content.DownloadToStream(image);  
  75. image.Seek(0, SeekOrigin.Begin);  
  76. Stream tn = CreateThumbnail(image);  
  77. thumbnail.UploadFromStream(tn);   
  78. // Физически удаляем сообщение из очереди  
  79. queue.DeleteMessage(msg);  
  80. }  
  81. // Если новых сообщение в очереди нет, то засыпаем на 5 секунд  
  82. else  
  83. {  
  84. System.Threading.Thread.Sleep(5000);  
  85. }  
  86. }  
  87. catch (Exception e)  
  88. {  
  89. // Explicitly catch all exceptions of type StorageException here because we should be able to   
  90. // recover from these exceptions next time the queue message, which caused this exception,  
  91. // becomes visible again.  
  92.   
  93. System.Threading.Thread.Sleep(5000);  
  94. Trace.TraceError(string.Format("Exception when processing queue item. Message: '{0}'", e.Message));  
  95. }  
  96. }  
  97. }  
  98. }  
На этом я хотел бы закончить часть посвященную нереляционным хранилищам данных в Windows Azure. В последующих публикациях я поведаю Вам об альтернативном реляционном варианте хранения данных в облаке под названием SQLAzure. Спасибо за внимание !

Комментариев нет: