• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JohnSnowLabs / spark-nlp / 6656626556

26 Oct 2023 03:52PM UTC coverage: 63.11% (+0.2%) from 62.895%
6656626556

push

github

maziyarpanahi
Bump version and update CHANGELOG [run doc]

8867 of 14050 relevant lines covered (63.11%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.48
/src/main/scala/com/johnsnowlabs/storage/RocksDBConnection.scala
1
/*
2
 * Copyright 2017-2022 John Snow Labs
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *    http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package com.johnsnowlabs.storage
18

19
import org.apache.hadoop.fs.Path
20
import org.apache.spark.SparkFiles
21
import org.rocksdb.{CompressionType, Options, RocksDB}
22

23
import java.io.File
24

25
final class RocksDBConnection private (path: String) extends AutoCloseable {
26

27
  RocksDB.loadLibrary()
1✔
28
  @transient private var db: RocksDB = _
29

30
  def getFileName: String = path
×
31

32
  private def getOptions: Options = {
33
    val options = new Options()
1✔
34
    options.setCreateIfMissing(true)
1✔
35
    options.setCompressionType(CompressionType.NO_COMPRESSION)
1✔
36
    options.setWriteBufferSize(20 * 1 << 20)
1✔
37
    options.setKeepLogFileNum(1)
1✔
38
    options.setDbLogDir(System.getProperty("java.io.tmpdir"))
1✔
39
    options.setMergeOperatorName("stringappend")
1✔
40

41
    options
42
  }
43

44
  def findLocalIndex: String = {
45
    val localPath = RocksDBConnection.getLocalPath(path)
1✔
46
    if (new File(localPath).exists()) {
1✔
47
      localPath
1✔
48
    } else if (new File(path).exists()) {
1✔
49
      path
1✔
50
    } else {
×
51
      val localFromClusterPath = SparkFiles.get(path)
×
52
      require(
×
53
        new File(localFromClusterPath).exists(),
×
54
        s"Storage not found under given ref: $path\n" +
55
          "This usually means:\n" +
56
          "1. You have not loaded any storage under such ref or one of your Storage based " +
57
          "annotators has `includeStorage` set to false and must be loaded manually\n" +
58
          "2. You are trying to use cluster mode without a proper shared filesystem.\n" +
59
          "3. You are trying to use a Kubernetes cluster without a proper shared filesystem. " +
60
          "In this case, try to enable the parameter to keep models in memory " +
61
          "(setEnableInMemoryStorage) if available.\n" +
62
          "4. Your source was not provided to storage creation\n" +
63
          "5. If you are trying to utilize Storage defined elsewhere, make sure it has the " +
×
64
          "appropriate ref. ")
65
      localFromClusterPath
66
    }
67
  }
68

69
  def connectReadWrite: RocksDB = {
70
    if (Option(db).isDefined) {
1✔
71
      db
×
72
    } else {
1✔
73
      db = RocksDB.open(getOptions, findLocalIndex)
1✔
74
      RocksDBConnection.cache.update(path, this)
1✔
75
      db
1✔
76
    }
77
  }
78

79
  def connectReadOnly: RocksDB = {
80
    if (RocksDBConnection.cache.contains(path)) {
1✔
81
      db = RocksDBConnection.cache(path).getDb
1✔
82
      db
1✔
83
    } else if (Option(db).isDefined)
1✔
84
      db
×
85
    else {
1✔
86
      db = RocksDB.openReadOnly(getOptions, findLocalIndex)
1✔
87
      RocksDBConnection.cache.update(path, this)
1✔
88
      db
1✔
89
    }
90
  }
91

92
  def reconnectReadWrite: RocksDB = {
93
    require(Option(db).isDefined, "Tried to reconnect on a closed connection")
×
94
    close()
1✔
95
    connectReadWrite
1✔
96
  }
97

98
  override def close(): Unit = {
99
    if (Option(db).isDefined) {
1✔
100
      db.close()
1✔
101
      db = null
1✔
102
      RocksDBConnection.cache.remove(path)
1✔
103
    }
104
  }
105

106
  def getDb: RocksDB = {
107
    if (Option(db).isEmpty)
1✔
108
      throw new Exception("Attempted to get a non-existing connection")
×
109
    db
1✔
110
  }
111

112
  def isConnected: Boolean = {
113
    if (Option(db).isDefined)
1✔
114
      true
1✔
115
    else
116
      false
1✔
117
  }
118

119
}
120

121
object RocksDBConnection {
122
  @transient private[storage] val cache: scala.collection.mutable.Map[String, RocksDBConnection] =
123
    scala.collection.mutable.Map.empty[String, RocksDBConnection]
1✔
124

125
  def getOrCreate(pathOrLocator: String): RocksDBConnection = {
126
    if (cache.contains(pathOrLocator)) cache(pathOrLocator)
1✔
127
    else new RocksDBConnection(pathOrLocator)
1✔
128
  }
129

130
  def getOrCreate(database: String, refName: String): RocksDBConnection = {
131
    val combinedName = StorageHelper.resolveStorageName(database, refName)
1✔
132
    getOrCreate(combinedName)
1✔
133
  }
134

135
  def getOrCreate(database: Database.Name, refName: String): RocksDBConnection =
136
    getOrCreate(database.toString, refName)
1✔
137

138
  def getLocalPath(fileName: String): String = {
139
    Path
140
      .mergePaths(new Path(SparkFiles.getRootDirectory()), new Path("/storage/" + fileName))
141
      .toString
1✔
142
  }
143

144
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc