Python 数据持久化 - Cassandra 驱动程序


Cassandra 是另一种流行的 NoSQL 数据库。高可扩展性、一致性和容错性——这些是 Cassandra 的一些重要特性。这是列存储数据库。数据存储在许多商用服务器上。因此,数据高度可用。

Cassandra 是 Apache 软件基金会的产品。数据以分布式方式存储在多个节点上。每个节点都是由密钥空间组成的单个服务器。Cassandra 数据库的基本构建块是键空间,可以将其视为类似于数据库。

Cassandra 一个节点中的数据通过对等节点网络复制到其他节点中。这使得 Cassandra 成为一个万无一失的数据库。该网络称为数据中心。多个数据中心可以互连形成集群。复制的性质是通过在创建键空间时设置复制策略和复制因子来配置的。

一个键空间可能有多个列族——就像一个数据库可能包含多个表一样。Cassandra 的键空间没有预定义的架构。Cassandra 表中的每一行可能具有不同名称和不同数量的列。

Cassandra表

Cassandra 软件还有两个版本:社区版和企业版。Cassandra 的最新企业版可从https://cassandra.apache.org/download/下载。社区版位于https://academy.datastax.com/planet-cassandra/cassandra

Cassandra 有自己的查询语言,称为Cassandra 查询语言 (CQL)。CQL 查询可以从 CQLASH shell 内部执行 - 类似于 MySQL 或 SQLite shell。CQL 语法与标准 SQL 类似。

Datastax 社区版还附带了一个 Develcenter IDE,如下图所示 -

开发中心IDE

用于使用 Cassandra 数据库的 Python 模块称为Cassandra Driver。它也是由 Apache 基金会开发的。该模块包含一个 ORM API,以及一个本质上类似于关系数据库的 DB-API 的核心 API。

使用pip 实用程序可以轻松安装 Cassandra 驱动程序。

pip3 install cassandra-driver

与 Cassandra 数据库的交互是通过 Cluster 对象完成的。Cassandra.cluster 模块定义 Cluster 类。我们首先需要声明 Cluster 对象。

from cassandra.cluster import Cluster
clstr=Cluster()

所有事务(例如插入/更新等)都是通过使用密钥空间启动会话来执行的。

session=clstr.connect()

要创建新的键空间,请使用会话对象的execute()方法。execute() 方法采用一个字符串参数,该参数必须是查询字符串。CQL 具有 CREATE KEYSPACE 语句,如下所示。完整代码如下 -

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect()
session.execute(“create keyspace mykeyspace with replication={
   'class': 'SimpleStrategy', 'replication_factor' : 3
};”

这里,SimpleStrategy是复制策略的值,复制因子设置为3。如前所述,一个键空间包含一个或多个表。每个表都以其数据类型为特征。Python 数据类型会根据下表自动解析为相应的 CQL 数据类型 -

Python类型 CQL型
没有任何 无效的
布尔 布尔值
漂浮 浮动、双
整型、长整型 int、bigint、varint、smallint、tinyint、计数器
小数.小数 十进制
字符串,统一码 ascii、varchar、文本
缓冲区、字节数组 斑点
日期 日期
约会时间 时间戳
时间 时间
列表、元组、生成器 列表
设定、冻结
字典,有序字典 地图
uuid.UUID 时间uuid, uuid

要创建表,请使用会话对象执行用于创建表的 CQL 查询。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
qry= '''
create table students (
   studentID int,
   name text,
   age int,
   marks int,
   primary key(studentID)
);'''
session.execute(qry)

如此创建的键空间可以进一步用于插入行。INSERT 查询的 CQL 版本与 SQL Insert 语句类似。以下代码在学生表中插入一行。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
session.execute("insert into students (studentID, name, age, marks) values 
   (1, 'Juhi',20, 200);"

正如您所期望的,SELECT 语句也与 Cassandra 一起使用。如果execute()方法包含SELECT查询字符串,它返回一个结果集对象,可以使用循环遍历该结果集对象。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
rows=session.execute("select * from students;")
for row in rows:
print (StudentID: {} Name:{} Age:{} price:{} Marks:{}'
   .format(row[0],row[1], row[2], row[3]))

Cassandra 的 SELECT 查询支持使用 WHERE 子句对要获取的结果集应用过滤器。可以识别传统的逻辑运算符,如 <、> == 等。要仅检索学生表中年龄>20 的姓名行,execute() 方法中的查询字符串应如下所示 -

rows=session.execute("select * from students WHERE age>20 allow filtering;")

注意,使用ALLOW FILTERING。该语句的 ALLOW FILTERING 部分允许显式允许(某些)需要过滤的查询。

Cassandra 驱动程序 API 在其 cassendra.query 模块中定义了以下 Statement 类型的类。

简单语句

查询字符串中包含的简单、未准备的 CQL 查询。上面的所有例子都是SimpleStatement的例子。

批量报表

多个查询(例如 INSERT、UPDATE 和 DELETE)会放入批处理中并立即执行。每行首先转换为 SimpleStatement,然后批量添加。

让我们以元组列表的形式将要添加到 Students 表中的行如下 -

studentlist=[(1,'Juhi',20,100), ('2,'dilip',20, 110),(3,'jeevan',24,145)]

要使用 BathStatement 添加以上行,请运行以下脚本 -

from cassandra.query import SimpleStatement, BatchStatement
batch=BatchStatement()
for student in studentlist:
   batch.add(SimpleStatement("INSERT INTO students 
      (studentID, name, age, marks) VALUES
      (%s, %s, %s %s)"), (student[0], student[1],student[2], student[3]))
session.execute(batch)

准备好的声明

准备好的语句就像 DB-API 中的参数化查询。它的查询字符串由 Cassandra 保存以供以后使用。Session.prepare() 方法返回一个PreparedStatement 实例。

对于我们的学生表,INSERT 查询的PreparedStatement 如下 -

stmt=session.prepare("INSERT INTO students (studentID, name, age, marks) VALUES (?,?,?)")

随后只需发送参数值即可进行绑定。例如 -

qry=stmt.bind([1,'Ram', 23,175])

最后执行上面的绑定语句。

session.execute(qry)

这减少了网络流量和 CPU 利用率,因为 Cassandra 不必每次都重新解析查询。