Overview

Namespaces

  • OpenCloud
    • Autoscale
      • Resource
    • CDN
      • Resource
    • CloudMonitoring
      • Collection
      • Exception
      • Resource
    • Common
      • Collection
      • Constants
      • Exceptions
      • Http
        • Message
      • Log
      • Resource
      • Service
    • Compute
      • Constants
      • Exception
      • Resource
    • Database
      • Resource
    • DNS
      • Collection
      • Resource
    • Identity
      • Constants
      • Resource
    • Image
      • Enum
      • Resource
        • JsonPatch
        • Schema
    • LoadBalancer
      • Collection
      • Enum
      • Resource
    • Networking
      • Resource
    • ObjectStore
      • Constants
      • Enum
      • Exception
      • Resource
      • Upload
    • Orchestration
      • Resource
    • Queues
      • Collection
      • Exception
      • Resource
    • Volume
      • Resource
  • PHP

Classes

  • AbstractTransfer
  • ConcurrentTransfer
  • ConsecutiveTransfer
  • ContainerMigration
  • DirectorySync
  • TransferBuilder
  • TransferPart
  • TransferState
  • Overview
  • Namespace
  • Class
  • Tree
 1: <?php
 2: /**
 3:  * Copyright 2012-2014 Rackspace US, Inc.
 4:  *
 5:  * Licensed under the Apache License, Version 2.0 (the "License");
 6:  * you may not use this file except in compliance with the License.
 7:  * You may obtain a copy of the License at
 8:  *
 9:  * http://www.apache.org/licenses/LICENSE-2.0
10:  *
11:  * Unless required by applicable law or agreed to in writing, software
12:  * distributed under the License is distributed on an "AS IS" BASIS,
13:  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14:  * See the License for the specific language governing permissions and
15:  * limitations under the License.
16:  */
17: 
18: namespace OpenCloud\ObjectStore\Upload;
19: 
20: use Guzzle\Http\EntityBody;
21: use Guzzle\Http\ReadLimitEntityBody;
22: 
23: /**
24:  * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
25:  * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
26:  * dividing the total size by the individual part size.
27:  *
28:  * @codeCoverageIgnore
29:  */
30: class ConcurrentTransfer extends AbstractTransfer
31: {
32:     public function transfer()
33:     {
34:         $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
35:         $workers = min($totalParts, $this->options['concurrency']);
36:         $parts = $this->collectParts($workers);
37: 
38:         while ($this->transferState->count() < $totalParts) {
39:             $completedParts = $this->transferState->count();
40:             $requests = array();
41: 
42:             // Iterate over number of workers until total completed parts is what we need it to be
43:             for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
44:                 // Offset is the current pointer multiplied by the standard chunk length
45:                 $offset = ($completedParts + $i) * $this->partSize;
46:                 $parts[$i]->setOffset($offset);
47: 
48:                 // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
49:                 if ($parts[$i]->getContentLength() == 0) {
50:                     break;
51:                 }
52: 
53:                 // Add this to the request queue for later processing
54:                 $requests[] = TransferPart::createRequest(
55:                     $parts[$i],
56:                     $this->transferState->count() + $i + 1,
57:                     $this->client,
58:                     $this->options
59:                 );
60:             }
61: 
62:             // Iterate over our queued requests and process them
63:             foreach ($this->client->send($requests) as $response) {
64:                 // Add this part to the TransferState
65:                 $this->transferState->addPart(TransferPart::fromResponse($response));
66:             }
67:         }
68:     }
69: 
70:     /**
71:      * Partitions the entity body into an array - each worker is represented by a key, and the value is a
72:      * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
73:      * ensure the chunks are sent correctly.
74:      *
75:      * @param int    The total number of workers
76:      * @return array The worker array
77:      */
78:     private function collectParts($workers)
79:     {
80:         $uri = $this->entityBody->getUri();
81: 
82:         $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
83: 
84:         for ($i = 1; $i < $workers; $i++) {
85:             // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
86:             $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
87:         }
88: 
89:         return $array;
90:     }
91: }
92: 
API documentation generated by ApiGen 2.8.0