• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 8054290840

26 Feb 2024 07:11PM UTC coverage: 62.707%. First build
8054290840

Pull #14164

github

web-flow
Merge e9fdbe61f into e805c4308
Pull Request #14164: release/530-release-candidate

8964 of 14295 relevant lines covered (62.71%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/scala/com/johnsnowlabs/client/azure/AzureGateway.scala
1
package com.johnsnowlabs.client.azure
2

3
import com.azure.storage.blob.models.ListBlobsOptions
4
import com.azure.storage.blob.{BlobServiceClient, BlobServiceClientBuilder}
5
import com.johnsnowlabs.client.CloudStorage
6
import com.johnsnowlabs.nlp.util.io.ResourceHelper
7
import org.apache.hadoop.fs.{FileSystem, Path}
8
import org.apache.hadoop.io.IOUtils
9

10
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, InputStream}
11
import scala.jdk.CollectionConverters.asScalaIteratorConverter
12

13
class AzureGateway(storageAccountName: String, accountKey: String) extends CloudStorage {
14

15
  private lazy val blobServiceClient: BlobServiceClient = {
16
    val connectionString =
17
      s"DefaultEndpointsProtocol=https;AccountName=$storageAccountName;AccountKey=$accountKey;EndpointSuffix=core.windows.net"
18

19
    val blobServiceClient = new BlobServiceClientBuilder()
20
      .connectionString(connectionString)
21
      .buildClient()
22

23
    blobServiceClient
24
  }
25

26
  override def doesBucketPathExist(bucketName: String, filePath: String): Boolean = {
27
    val blobContainerClient = blobServiceClient
28
      .getBlobContainerClient(bucketName)
×
29

30
    val prefix = if (filePath.endsWith("/")) filePath else filePath + "/"
×
31

32
    val blobs = blobContainerClient
33
      .listBlobs()
34
      .iterator()
×
35
      .asScala
36
      .filter(_.getName.startsWith(prefix))
×
37

38
    blobs.nonEmpty
×
39
  }
40

41
  override def copyFileToBucket(
42
      bucketName: String,
43
      destinationPath: String,
44
      inputStream: InputStream): Unit = {
45

46
    val blockBlobClient = blobServiceClient
47
      .getBlobContainerClient(bucketName)
48
      .getBlobClient(destinationPath)
49
      .getBlockBlobClient
×
50

51
    val streamSize = inputStream.available()
×
52
    blockBlobClient.upload(inputStream, streamSize)
×
53
  }
54

55
  override def copyInputStreamToBucket(
56
      bucketName: String,
57
      filePath: String,
58
      sourceFilePath: String): Unit = {
59
    val fileSystem = FileSystem.get(ResourceHelper.spark.sparkContext.hadoopConfiguration)
×
60
    val inputStream = fileSystem.open(new Path(sourceFilePath))
×
61

62
    val byteArrayOutputStream = new ByteArrayOutputStream()
×
63
    IOUtils.copyBytes(inputStream, byteArrayOutputStream, 4096, true)
×
64

65
    val byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray)
×
66

67
    val blockBlobClient = blobServiceClient
68
      .getBlobContainerClient(bucketName)
69
      .getBlobClient(filePath)
70
      .getBlockBlobClient
×
71

72
    val streamSize = byteArrayInputStream.available()
×
73
    blockBlobClient.upload(byteArrayInputStream, streamSize)
×
74
  }
75

76
  override def downloadFilesFromBucketToDirectory(
77
      bucketName: String,
78
      filePath: String,
79
      directoryPath: String,
80
      isIndex: Boolean): Unit = {
81
    try {
×
82
      val blobContainerClient = blobServiceClient.getBlobContainerClient(bucketName)
×
83
      val blobOptions = new ListBlobsOptions().setPrefix(filePath)
×
84
      val blobs = blobContainerClient
85
        .listBlobs(blobOptions, null)
86
        .iterator()
×
87
        .asScala
88
        .toSeq
×
89

90
      if (blobs.isEmpty) {
×
91
        throw new Exception(
×
92
          s"Not found blob path $filePath in container $bucketName when downloading files from Azure Blob Storage")
93
      }
94

95
      blobs.foreach { blobItem =>
×
96
        val blobName = blobItem.getName
×
97
        val blobClient = blobContainerClient.getBlobClient(blobName)
×
98

99
        val file = new File(s"$directoryPath/$blobName")
×
100
        if (blobName.endsWith("/")) {
×
101
          file.mkdirs()
×
102
        } else {
×
103
          file.getParentFile.mkdirs()
×
104
          val outputStream = new FileOutputStream(file)
×
105
          blobClient.downloadStream(outputStream)
×
106
          outputStream.close()
×
107
        }
108
      }
109
    } catch {
110
      case e: Exception =>
111
        throw new Exception(
×
112
          "Error when downloading files from Azure Blob Storage: " + e.getMessage)
113
    }
114
  }
115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc