• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 8054290840

26 Feb 2024 07:11PM UTC coverage: 62.707%. First build
8054290840

Pull #14164

github

web-flow
Merge e9fdbe61f into e805c4308
Pull Request #14164: release/530-release-candidate

8964 of 14295 relevant lines covered (62.71%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/scala/com/johnsnowlabs/client/CloudResources.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.johnsnowlabs.client
17

18
import com.amazonaws.services.s3.model.S3Object
19
import com.johnsnowlabs.client.aws.{AWSClient, AWSGateway}
20
import com.johnsnowlabs.client.azure.AzureClient
21
import com.johnsnowlabs.client.gcp.GCPClient
22
import com.johnsnowlabs.client.util.CloudHelper
23
import com.johnsnowlabs.client.util.CloudHelper.transformURIToWASB
24
import com.johnsnowlabs.util.{ConfigHelper, ConfigLoader}
25
import org.apache.commons.io.IOUtils
26
import org.apache.spark.SparkFiles
27

28
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
29
import java.net.URI
30
import java.nio.file.Paths
31
import java.util.zip.ZipInputStream
32

33
object CloudResources {
34

35
  def downloadModelFromCloud(
36
      awsGateway: AWSGateway,
37
      cachePath: String,
38
      modelName: String,
39
      sourceS3URI: String): Option[String] = {
40

41
    val (sourceBucketName, sourceKey) = CloudHelper.parseS3URI(sourceS3URI)
×
42
    val zippedModel = awsGateway.getS3Object(sourceBucketName, sourceKey)
×
43

44
    val cloudManager = new CloudManager()
×
45
    val clientInstance = cloudManager.getClientInstance(cachePath)
×
46

47
    clientInstance match {
48
      case awsClient: AWSClient => {
49
        val destinationS3URI = cachePath.replace("s3:", "s3a:")
×
50
        val modelExists =
51
          doesModelExistInExternalCloudStorage(modelName, destinationS3URI, awsClient)
×
52

53
        if (!modelExists) {
×
54
          val destinationKey =
55
            unzipInExternalCloudStorage(sourceKey, destinationS3URI, awsClient, zippedModel)
×
56
          Option(destinationKey)
×
57
        } else {
58
          Option(destinationS3URI + "/" + modelName)
×
59
        }
60
      }
61
      case gcpClient: GCPClient => {
62

63
        val modelExists =
64
          doesModelExistInExternalCloudStorage(modelName, cachePath, gcpClient)
×
65

66
        if (!modelExists) {
×
67
          val destination =
68
            unzipInExternalCloudStorage(sourceS3URI, cachePath, gcpClient, zippedModel)
×
69
          Option(destination)
×
70
        } else {
71
          Option(cachePath + "/" + modelName)
×
72
        }
73
      }
74
      case azureClient: AzureClient => {
75
        val modelExists =
76
          doesModelExistInExternalCloudStorage(modelName, cachePath, azureClient)
×
77
        var modelPath: Option[String] = None
×
78

79
        if (!modelExists) {
×
80
          val destination =
81
            unzipInExternalCloudStorage(sourceS3URI, cachePath, azureClient, zippedModel)
×
82
          modelPath = Some(transformURIToWASB(destination))
×
83
        } else {
84
          modelPath = Some(transformURIToWASB(cachePath + "/" + modelName))
×
85
        }
86

87
        modelPath
88
      }
89
    }
90

91
  }
92

93
  private def doesModelExistInExternalCloudStorage(
94
      modelName: String,
95
      destinationURI: String,
96
      cloudClient: CloudClient): Boolean = {
97

98
    cloudClient match {
99
      case awsDestinationClient: AWSClient => {
100
        val (destinationBucketName, destinationKey) = CloudHelper.parseS3URI(destinationURI)
×
101

102
        val modelPath = destinationKey + "/" + modelName
×
103

104
        awsDestinationClient.doesBucketPathExist(destinationBucketName, modelPath)
×
105
      }
106
      case gcpClient: GCPClient => {
107
        val (destinationBucketName, destinationStoragePath) =
×
108
          CloudHelper.parseGCPStorageURI(destinationURI)
109
        val modelPath = destinationStoragePath + "/" + modelName
×
110

111
        gcpClient.doesBucketPathExist(destinationBucketName, modelPath)
×
112
      }
113
      case azureClient: AzureClient => {
114
        val (destinationBucketName, destinationStoragePath) =
×
115
          CloudHelper.parseAzureBlobURI(destinationURI)
116
        val modelPath = destinationStoragePath + "/" + modelName
×
117

118
        azureClient.doesBucketPathExist(destinationBucketName, modelPath)
×
119
      }
120
    }
121

122
  }
123

124
  private def unzipInExternalCloudStorage(
125
      sourceKey: String,
126
      destinationStorageURI: String,
127
      cloudClient: CloudClient,
128
      zippedModel: S3Object): String = {
129

130
    val zipInputStream = new ZipInputStream(zippedModel.getObjectContent)
×
131
    var zipEntry = zipInputStream.getNextEntry
×
132

133
    val zipFile = sourceKey.split("/").last
×
134
    val modelName = zipFile.substring(0, zipFile.indexOf(".zip"))
×
135

136
    println(s"Uploading model $modelName to external Cloud Storage URI: $destinationStorageURI")
×
137
    while (zipEntry != null) {
×
138
      if (!zipEntry.isDirectory) {
×
139
        val outputStream = new ByteArrayOutputStream()
×
140
        IOUtils.copy(zipInputStream, outputStream)
×
141
        val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
×
142

143
        cloudClient match {
144
          case awsClient: AWSClient => {
145
            val (awsGatewayDestination, destinationBucketName, destinationKey) = getS3Config(
×
146
              destinationStorageURI)
147
            val fileName = s"$modelName/${zipEntry.getName}"
×
148
            val destinationS3Path = destinationKey + "/" + fileName
×
149

150
            awsGatewayDestination.copyFileToBucket(
×
151
              destinationBucketName,
152
              destinationS3Path,
153
              inputStream)
154
          }
155
          case gcpClient: GCPClient => {
156
            val (destinationBucketName, destinationStoragePath) =
×
157
              CloudHelper.parseGCPStorageURI(destinationStorageURI)
158

159
            val destinationGCPStoragePath =
160
              s"$destinationStoragePath/$modelName/${zipEntry.getName}"
×
161

162
            gcpClient.copyFileToBucket(
×
163
              destinationBucketName,
164
              destinationGCPStoragePath,
165
              inputStream)
166
          }
167
          case azureClient: AzureClient => {
168
            val (destinationBucketName, destinationStoragePath) =
×
169
              CloudHelper.parseAzureBlobURI(destinationStorageURI)
170

171
            val destinationAzureStoragePath =
172
              s"$destinationStoragePath/$modelName/${zipEntry.getName}".stripPrefix("/")
×
173

174
            azureClient.copyFileToBucket(
×
175
              destinationBucketName,
176
              destinationAzureStoragePath,
177
              inputStream)
178
          }
179
        }
180

181
      }
182
      zipEntry = zipInputStream.getNextEntry
×
183
    }
184
    destinationStorageURI + "/" + modelName
×
185
  }
186

187
  private def getS3Config(destinationS3URI: String): (AWSClient, String, String) = {
188
    val cloudManager = new CloudManager()
×
189
    val clientInstance = cloudManager.getClientInstance(destinationS3URI)
×
190

191
    val (destinationBucketName, destinationKey) = CloudHelper.parseS3URI(destinationS3URI)
×
192
    (clientInstance.asInstanceOf[AWSClient], destinationBucketName, destinationKey)
×
193
  }
194

195
  def storeLogFileInCloudStorage(outputLogsPath: String, targetPath: String): Unit = {
196
    val parameters = Map("credentialsType" -> "proprietary")
×
197
    val cloudManager = new CloudManager(parameters)
×
198
    val logsPath = if (outputLogsPath.nonEmpty) outputLogsPath else getLogsFolder
×
199
    val clientInstance = cloudManager.getClientInstance(logsPath)
×
200

201
    clientInstance match {
202
      case awsClient: AWSClient => storeLogFileInS3(outputLogsPath, targetPath, awsClient)
×
203
      case gcpClient: GCPClient => storeLogFileInGCPStorage(outputLogsPath, targetPath, gcpClient)
×
204
      case azureClient: AzureClient =>
205
        storeLogFileInAzureStorage(outputLogsPath, targetPath, azureClient)
×
206
    }
207
  }
208

209
  private def storeLogFileInS3(
210
      outputLogsPath: String,
211
      targetPath: String,
212
      awsClient: AWSClient): Unit = {
213

214
    def parseConfigS3Path(): (String, String) = {
215
      val s3Bucket = ConfigLoader.getConfigStringValue(ConfigHelper.awsExternalS3BucketKey)
×
216
      val s3Path = ConfigLoader.getConfigStringValue(ConfigHelper.annotatorLogFolder) + "/"
×
217
      (s3Bucket, s3Path)
×
218
    }
219

220
    val logsPathSuffix = outputLogsPath.takeWhile(_ != ':')
×
221
    val (s3Bucket, s3Path) = logsPathSuffix match {
×
222
      case "s3" | "s3a" => CloudHelper.parseS3URI(outputLogsPath, includePrefixInKey = true)
223
      case _ if getLogsFolder.startsWith("s3") || getLogsFolder.startsWith("s3a") =>
224
        parseConfigS3Path()
225
      case _ => throw new IllegalArgumentException("Unsupported outputLogsPath")
226
    }
227

228
    val s3FilePath = s"""${s3Path.substring("s3://".length)}/${targetPath.split("/").last}"""
×
229
    awsClient.copyInputStreamToBucket(s3Bucket, s3FilePath, targetPath)
×
230
  }
231

232
  private def getLogsFolder: String =
233
    ConfigLoader.getConfigStringValue(ConfigHelper.annotatorLogFolder)
×
234

235
  private def storeLogFileInGCPStorage(
236
      outputLogsPath: String,
237
      targetPath: String,
238
      gcpClient: GCPClient): Unit = {
239
    val (gcpBucket, storagePath) = CloudHelper.parseGCPStorageURI(outputLogsPath)
×
240
    val fileName = Paths.get(targetPath).getFileName.toString
×
241
    val destinationPath = s"$storagePath/$fileName"
×
242
    gcpClient.copyInputStreamToBucket(gcpBucket, destinationPath, targetPath)
×
243
  }
244

245
  private def storeLogFileInAzureStorage(
246
      outputLogsPath: String,
247
      targetPath: String,
248
      azureClient: AzureClient): Unit = {
249
    val (azureBucket, storagePath) = CloudHelper.parseAzureBlobURI(outputLogsPath)
×
250
    val fileName = Paths.get(targetPath).getFileName.toString
×
251
    val destinationPath = s"$storagePath/$fileName"
×
252
    azureClient.copyInputStreamToBucket(azureBucket, destinationPath, targetPath)
×
253
  }
254

255
  /** Downloads the provided bucket path to a local temporary directory and returns the location
256
    * of the folder.
257
    *
258
    * @param bucketURI
259
    *   Bucket URI to the resource
260
    * @param tempLocalPath
261
    *   The tmp local directory
262
    * @param isIndex
263
    *   Whether the file is used in RocksDB storage
264
    * @return
265
    *   URI of the local path to the temporary folder of the resource
266
    */
267
  def downloadBucketToLocalTmp(
268
      bucketURI: String,
269
      tempLocalPath: String = "",
270
      isIndex: Boolean = false): URI = {
271
    // This method used to be known as ResourceDownloader.downloadS3Directory
272
    val cloudManager = new CloudManager()
×
273
    val clientInstance = cloudManager.getClientInstanceFromConfigurationParams(bucketURI)
×
274
    val directory =
275
      if (tempLocalPath.isEmpty) SparkFiles.getRootDirectory() else tempLocalPath
×
276

277
    clientInstance match {
278
      case awsClient: AWSClient => {
279
        val (bucketName, keyPrefix) = CloudHelper.parseS3URI(bucketURI)
×
280
        awsClient.downloadFilesFromBucketToDirectory(bucketName, keyPrefix, directory, isIndex)
×
281
        Paths.get(directory, keyPrefix).toUri
×
282
      }
283
      case gcpClient: GCPClient => {
284
        val (bucketName, keyPrefix) = CloudHelper.parseGCPStorageURI(bucketURI)
×
285
        gcpClient.downloadFilesFromBucketToDirectory(bucketName, keyPrefix, directory, isIndex)
×
286
        Paths.get(directory, keyPrefix).toUri
×
287
      }
288
      case azureClient: AzureClient => {
289
        val (bucketName, keyPrefix) = CloudHelper.parseAzureBlobURI(bucketURI)
×
290
        azureClient.downloadFilesFromBucketToDirectory(bucketName, keyPrefix, directory, isIndex)
×
291
        Paths.get(directory, keyPrefix).toUri
×
292
      }
293
    }
294

295
  }
296

297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc