• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 10656334014

01 Sep 2024 06:19PM UTC coverage: 62.392% (-0.02%) from 62.41%
10656334014

Pull #14355

github

web-flow
Merge 2a3ee298b into 50a69662f
Pull Request #14355: Implementing Mxbai Embeddings

0 of 2 new or added lines in 1 file covered. (0.0%)

27 existing lines in 7 files now uncovered.

8967 of 14372 relevant lines covered (62.39%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.41
/src/main/scala/com/johnsnowlabs/nlp/util/io/OutputHelper.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.johnsnowlabs.nlp.util.io
18

19
import com.johnsnowlabs.client.CloudResources
20
import com.johnsnowlabs.client.util.CloudHelper
21
import com.johnsnowlabs.util.{ConfigHelper, ConfigLoader}
22
import org.apache.hadoop.fs.{FileSystem, Path}
23
import org.apache.spark.SparkFiles
24

25
import java.io.{File, FileWriter, PrintWriter}
26
import java.nio.charset.StandardCharsets
27
import scala.util.{Failure, Success, Try}
28

29
object OutputHelper {
30

31
  private lazy val fileSystem = getFileSystem
32

33
  private lazy val sparkSession = ResourceHelper.spark
34

35
  def getFileSystem: FileSystem = {
36
    FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
1✔
37
  }
38
  def getFileSystem(resource: String): FileSystem = {
39
    val resourcePath = new Path(parsePath(resource))
1✔
40
    FileSystem.get(resourcePath.toUri, sparkSession.sparkContext.hadoopConfiguration)
1✔
41
  }
42

43
  def parsePath(path: String): String = {
44
    val pathPrefix = path.split("://").head
1✔
45
    pathPrefix match {
46
      case "s3" => path.replace("s3", "s3a")
×
47
      case "file" => {
48
        val pattern = """^file:(/+)""".r
1✔
49
        pattern.replaceAllIn(path, "file:///")
1✔
50
      }
51
      case _ => path
52
    }
53
  }
54

55
  def doesPathExists(resource: String): (Boolean, Option[Path]) = {
56
    val fileSystem = OutputHelper.getFileSystem(resource)
1✔
57
    var modifiedPath = resource
58

59
    fileSystem.getScheme match {
1✔
60
      case "file" =>
61
        val path = new Path(resource)
1✔
62
        var exists = Try {
1✔
63
          fileSystem.exists(path)
1✔
64
        } match {
65
          case Success(value) => value
66
          case Failure(_) => false
1✔
67
        }
68

69
        if (!exists) {
1✔
70
          modifiedPath = resource.replaceFirst("//+", "///")
1✔
71
          exists = Try {
1✔
72
            fileSystem.exists(new Path(modifiedPath))
1✔
73
          } match {
74
            case Success(value) => value
75
            case Failure(_) => false
×
76
          }
77
        }
78

79
        if (!exists) {
1✔
80
          modifiedPath = resource.replaceFirst("/+", "//")
1✔
81
          exists = Try {
1✔
82
            fileSystem.exists(new Path(modifiedPath))
1✔
83
          } match {
84
            case Success(value) => value
85
            case Failure(_) => false
1✔
86
          }
87
        }
88

89
        if (!exists) {
1✔
90
          val pattern = """^file:/*""".r
1✔
91
          modifiedPath = pattern.replaceAllIn(resource, "")
1✔
92
          exists = Try {
1✔
93
            fileSystem.exists(new Path(modifiedPath))
1✔
94
          } match {
95
            case Success(value) => value
96
            case Failure(_) => false
×
97
          }
98
        }
99

100
        if (exists) {
101
          (exists, Some(new Path(modifiedPath)))
1✔
102
        } else (exists, None)
1✔
103
      case _ => {
104
        val exists = Try {
×
105
          val modifiedPath = parsePath(resource)
×
106
          fileSystem.exists(new Path(modifiedPath))
×
107
        } match {
108
          case Success(value) => value
109
          case Failure(_) => false
×
110
        }
111

112
        if (exists) {
113
          (exists, Some(new Path(modifiedPath)))
×
114
        } else (exists, None)
×
115
      }
116
    }
117

118
  }
119

120
  private def getLogsFolder: String =
121
    ConfigLoader.getConfigStringValue(ConfigHelper.annotatorLogFolder)
1✔
122

123
  private lazy val isDBFS =
124
    fileSystem.getScheme.equals("dbfs")
125

126
  private var targetPath: Path = _
127

128
  private var historyLog: Array[String] = Array()
1✔
129

130
  def writeAppend(uuid: String, content: String, outputLogsPath: String): Unit = {
131
    val targetFolder = getTargetFolder(outputLogsPath)
1✔
132
    targetPath = new Path(targetFolder, uuid + ".log")
1✔
133

134
    fileSystem.getScheme match {
1✔
135
      case "dbfs" =>
136
        historyLog = historyLog ++ Array(content)
×
137
      case "hdfs" =>
138
        if (!fileSystem.exists(targetPath)) {
×
139
          fileSystem.createNewFile(targetPath)
×
140
        }
141
        val outputStream = fileSystem.append(targetPath)
×
142
        val writer = new PrintWriter(outputStream)
×
143
        writer.append(content + System.lineSeparator())
×
144
        writer.close()
×
145
      case _ =>
146
        if (CloudHelper.isCloudPath(targetFolder)) {
×
147
          fileSystem.createNewFile(targetPath)
×
148
          val fo = fileSystem.append(targetPath)
×
149
          val writer = new PrintWriter(fo, true)
×
150
          writer.append(content + System.lineSeparator())
×
151
          writer.close()
×
152
          fo.close()
×
153
        } else {
1✔
154
          if (!fileSystem.exists(new Path(targetFolder)))
1✔
155
            fileSystem.mkdirs(new Path(targetFolder))
1✔
156
          val fo = new File(targetPath.toUri.getRawPath)
1✔
157
          val writer = new FileWriter(fo, true)
1✔
158
          writer.append(content + System.lineSeparator())
1✔
159
          writer.close()
1✔
160
        }
161
    }
162
  }
163

164
  private def getTargetFolder(outputLogsPath: String): String = {
165
    val path = if (outputLogsPath.isEmpty) getLogsFolder else outputLogsPath
1✔
UNCOV
166
    if (CloudHelper.isCloudPath(path)) SparkFiles.getRootDirectory() + "/tmp/logs" else path
×
167
  }
168

169
  def exportLogFile(outputLogsPath: String): Unit = {
170
    try {
1✔
171
      if (isDBFS) {
1✔
172
        val charset = StandardCharsets.ISO_8859_1
×
173
        val outputStream = fileSystem.create(targetPath)
×
174
        historyLog
×
175
          .map(log => log + System.lineSeparator())
×
176
          .foreach(log => outputStream.write(log.getBytes(charset)))
×
177
        outputStream.close()
×
178
        historyLog = Array()
×
179
      }
180

181
      if (CloudHelper.isCloudPath(outputLogsPath)) {
1✔
182
        CloudResources.storeLogFileInCloudStorage(outputLogsPath, targetPath.toString)
×
183
      }
184
    } catch {
185
      case e: Exception =>
186
        println(
×
187
          s"Warning couldn't export log on DBFS or Cloud Storage because of error: ${e.getMessage}")
×
188
    }
189
  }
190

191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc