Я пытаюсь создать Cassandra SSTables из результатов пакетного вычисления в Spark. В идеале каждый раздел должен создавать SSTable для хранящихся в нем данных, чтобы максимально распараллелить процесс (и, возможно, даже передать его в кольцо Cassandra).
После первоначальных препятствий с CQLSSTableWriter
(например, с требованием файла yaml) я столкнулся с этой проблемой:
java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts
at org.apache.cassandra.config.Schema.load(Schema.java:347)
at org.apache.cassandra.config.Schema.load(Schema.java:112)
at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)
Я создаю средство записи для каждого параллельного раздела следующим образом:
def store(rdd:RDD[Message]) = {
rdd.foreachPartition( msgIterator => {
val writer = CQLSSTableWriter.builder()
.inDirectory("/tmp/cass")
.forTable(schema)
.using(insertSttmt).build()
msgIterator.foreach(msg => {...})
})}
И если я правильно читаю исключение, я могу создать только одну запись для каждой таблицы в одной JVM. Я предполагаю, что записи в writer
не будут потокобезопасными, и даже если они будут конкуренцией, которую создадут несколько потоков, если все параллельные задачи будут пытаться сбрасывать несколько ГБ данных на диск одновременно, это приведет к поражению цели использования SSTables для массовой загрузки в любом случае.
Итак, есть ли способы одновременного использования CQLSSTableWriter
?
Если нет, то какой следующий лучший вариант для загрузки пакетных данных с высокой пропускной способностью в Cassandra?