• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 4947838414

pending completion
4947838414

Pull #13796

github

GitHub
Merge 30bdeef19 into ef7906c5e
Pull Request #13796: Add unzip param to downloadModelDirectly in ResourceDownloader

39 of 39 new or added lines in 2 files covered. (100.0%)

8632 of 13111 relevant lines covered (65.84%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.18
/src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.johnsnowlabs.storage
18

19
import com.johnsnowlabs.nlp.pretrained.ResourceDownloader
20
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
21
import org.apache.spark.sql.SparkSession
22
import org.apache.spark.{SparkContext, SparkFiles}
23

24
import java.io.File
25

26
object StorageHelper {
27

28
  def resolveStorageName(database: String, storageRef: String): String =
29
    new Path(database + "_" + storageRef).toString
1✔
30

31
  def load(
32
      storageSourcePath: String,
33
      spark: SparkSession,
34
      database: String,
35
      storageRef: String,
36
      withinStorage: Boolean): RocksDBConnection = {
37

38
    val dbFolder = StorageHelper.resolveStorageName(database, storageRef)
1✔
39
    val source = StorageLocator.getStorageSerializedPath(
1✔
40
      storageSourcePath.replaceAllLiterally("\\", "/"),
1✔
41
      dbFolder,
42
      withinStorage)
43

44
    val locator = StorageLocator(database, storageRef, spark)
1✔
45
    sendToCluster(
1✔
46
      source,
47
      locator.clusterFilePath,
1✔
48
      locator.clusterFileName,
1✔
49
      locator.destinationScheme,
1✔
50
      spark.sparkContext)
1✔
51

52
    RocksDBConnection.getOrCreate(locator.clusterFileName)
1✔
53
  }
54

55
  def save(
56
      path: String,
57
      connection: RocksDBConnection,
58
      spark: SparkSession,
59
      withinStorage: Boolean): Unit = {
60
    val indexUri = "file://" + (new java.net.URI(
1✔
61
      connection.findLocalIndex.replaceAllLiterally("\\", "/")).getPath)
1✔
62
    val index = new Path(indexUri)
1✔
63

64
    val uri = new java.net.URI(path.replaceAllLiterally("\\", "/"))
1✔
65
    val fs = FileSystem.get(uri, spark.sparkContext.hadoopConfiguration)
1✔
66
    val dst = new Path(path + {
1✔
67
      if (withinStorage) "/storage/" else ""
×
68
    })
69

70
    save(fs, index, dst)
1✔
71
  }
72

73
  private def save(fs: FileSystem, index: Path, dst: Path): Unit = {
74
    if (!fs.exists(dst))
1✔
75
      fs.mkdirs(dst)
1✔
76
    fs.copyFromLocalFile(false, true, index, dst)
1✔
77
  }
78

79
  def sendToCluster(
80
      source: Path,
81
      clusterFilePath: Path,
82
      clusterFileName: String,
83
      destinationScheme: String,
84
      sparkContext: SparkContext): Unit = {
85
    destinationScheme match {
86
      case "file" => {
87
        val destination = new Path(RocksDBConnection.getLocalPath(clusterFileName))
1✔
88
        copyIndexToLocal(source, destination, sparkContext)
1✔
89
      }
90
      case _ => copyIndexToCluster(source, clusterFilePath, sparkContext)
×
91
    }
92
  }
93

94
  private def copyIndexToCluster(
95
      sourcePath: Path,
96
      dst: Path,
97
      sparkContext: SparkContext): String = {
98
    if (!new File(SparkFiles.get(dst.getName)).exists()) {
×
99
      val srcFS = sourcePath.getFileSystem(sparkContext.hadoopConfiguration)
×
100
      val dstFS = dst.getFileSystem(sparkContext.hadoopConfiguration)
×
101

102
      if (srcFS.getScheme == "file") {
×
103
        val src = sourcePath
104
        dstFS.copyFromLocalFile(false, true, src, dst)
×
105
      } else {
106
        FileUtil.copy(
×
107
          srcFS,
108
          sourcePath,
109
          dstFS,
110
          dst,
111
          false,
×
112
          true,
×
113
          sparkContext.hadoopConfiguration)
×
114
      }
115

116
      sparkContext.addFile(dst.toString, recursive = true)
×
117
    }
118
    dst.toString
×
119
  }
120

121
  private def copyIndexToLocal(
122
      source: Path,
123
      destination: Path,
124
      sparkContext: SparkContext): Unit = {
125

126
    /** if we don't do a copy, and just move, it will all fail when re-saving utilized storage
127
      * because of bad crc
128
      */
129
    val fileSystemDestination = destination.getFileSystem(sparkContext.hadoopConfiguration)
1✔
130
    val fileSystemSource = source.getFileSystem(sparkContext.hadoopConfiguration)
1✔
131

132
    if (fileSystemDestination.exists(destination)) {
1✔
133
      return
1✔
134
    }
135

136
    if (fileSystemSource.getScheme == "s3a" && fileSystemDestination.getScheme == "file") {
1✔
137
      ResourceDownloader.downloadS3Directory(
×
138
        source.toString,
×
139
        destination.toString,
×
140
        isIndex = true)
×
141
      sparkContext.addFile(destination.toString, recursive = true)
×
142
      return
×
143
    }
144

145
    if (fileSystemDestination.getScheme != "s3a") {
×
146
      fileSystemDestination.copyFromLocalFile(false, true, source, destination)
1✔
147
    }
148
  }
149

150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc