nest中未提供cassandraORM集成,typeorm中也没有对cassandra的支持,因此在nest中使用cassandra时不能做到像typeorm那样无缝的ORM体验。但我们可以通过自定义模块来对官方的cassandra-dirver驱动做一个简单ORM封装,实现与typeorm一致的集成体验。此项目已发布到npm仓库,可以直接下载使用:

# npm
npm i cassandra-orm4nest --save

# yarn
yarn add cassandra-orm4nest

第一步

首先应确定模块的设计目标,首先模块在使用时应像typeorm类似,提供一个forRootforFeature方法,forRoot方法接收数据库相关配置连接数据库,并将数据库连接客户端等导出为全局模块,forFeature接收实体数据,读取实体的元数据自动创建出相应的mapper对象并导出,最后提供一个InjectMapper装饰器可以根据实体注入生成的mapper。最终欲实现的效果如下:

@Module({
    imports: [
        // 连接数据库配置
        CassandraOrmModule.forRoot({
            contactPoints: ['localhost'],
            authProvider: new auth.PlainTextAuthProvider('username', 'password'),
            localDataCenter: 'datacenter1'
        }),
        // 读取实体元数据,自动生成相应的mapper
        CassandraOrmModule.forFeature([
            User,
            Order
        ])
    ],
    controllers: [UserController],
    providers: [UserService, OrderService]
})
export class OrmModule{}

service中可以如下注入,如:

@Injectable()
export class UserService {
    constructor(
        // 注入mapper对象,生成的mapper对象为cassandra-driver中Mapper
        @InjectMapper(User) private readonly userMapper: mapping.Mapper,
    ){}
}

最终下来,主要要实现以下两点:

  • 实体注解。实现应用于实体的@Entity@Column注解,用于设置实体元数据,并可将元数据保存起来,forFeatures方法中可以将实体的元数据读取出来,以便生成mapper对象
  • 主模块定义。实现主模块CassandraOrmModule,并实现forRootforFeatures方法。

创建工程:

mkdir cassandra-orm4nest
cd cassandra-orm4nest
npm init
touch tsconfig.json
touch tsconfig.build.json

ts编译配置如下:

tsconfig.json:

{
  "compilerOptions": {
    "module": "commonjs",
    "declaration": true,
    "removeComments": true,
    "emitDecoratorMetadata": true,
    "experimentalDecorators": true,
    "target": "es2017",
    "sourceMap": true,
    "outDir": "./dist",
    "baseUrl": "./",
    "incremental": true
  },
  "include": [
    "lib/**/*",
    "test/**/*"
  ],
  "exclude": [
    "node_modules",
    "dist",
    "resource",
    "out"
  ]
}

需要用到以下依赖:

npm i @nestjs/core --save
npm i @nestjs/common --save
npm i rxjs --save
npm i reflect-metadata --save
npm i cassandra-driver --save
npm i typescript --save-dev
npm i @types/node --save-dev

实体元数据设置实现

首先,实现实体元数据的保存,定义一个名为Entity的装饰器,设置实体所属的keyspace表名,元数据设置利用Reflect实现,创建table.decorator.ts,如下:

// entity.decorator.ts
interface TableDecoratorOptions {
    keyspace: string, // 所属命名空间
    table: string // 所属数据库
}

export default function Entity(options: TableDecoratorOptions): ClassDecorator {
    return target => {
        // 设置元数据
        Reflect.defineMetadata('keyspace', options.keyspace, target);
        Reflect.defineMetadata('table', options.table, target);
    }
}

若是报Reflect中找不到defineMetadata方法,是因为未引入reflect-metadata,在工程中任何一个ts中将其引入即可,一般在入口文件中引入一次,可以先创建一个index.ts入口文件,将其引入,如下:

// index.ts
import 'reflect-metadata';

然后创建一个名为Column的装饰器,作用于实体的属性,用来给字段设置别名,需要注意的时,对属性不能像对类一样直接使用Reflect.defineMetadata设置元数据,因为编译为js后,属性信息将会丢失,其元数据信息也会随之丢失,因此,需要定义一个结构专门保存属性的元数据,如下:

// metadata-storage.helper.ts

export interface ColumnMetadataOptions {
    dbName: string, // 字段在数据库中的名称
    isJonStr?: boolean, // 是否是JSON字符串,若数据库字段类型是字符串但存储的是JSON,可以设为true,则实体中属性字段类型可设为对象,在操作库时可自动来回转换
    toModel?: (property: any) => any, // 转换函数,默认为null,用于设置数据库值到实体中属性值的转换规则
    fromModel?: (colum: any) => any, // 转换函数,默认为null,用于设置实体中属性值到数据库中字段值的转换规则
    propertyName: string  // 对应到实体的属性名
}

export default class MetadataStorageHelper {
    
    // 存储类中每个属性的元数据
    private static metadatasMap: Map<string, ColumnMetadataOptions[]> = new Map<string, ColumnMetadataOptions[]>();


    /**
     * 获取类在metadatasMap中的key值
     */
    static getClassMetadataKey(className: string) {
        return `CassandraOrm_${className}`;
    }

    /**
     * 添加属性的元数据信息
     * @param className 类名称
     * @param options 元数据信息
     */
    public static addColumMetadata(className: string, options: ColumnMetadataOptions | ColumnMetadataOptions[]) {
        const key = this.getClassMetadataKey(className);
        let metas = this.metadatasMap.get(key);
        if (!metas) {
            metas = [];
            this.metadatasMap.set(key, metas);
        }
        if (options instanceof Array) {
            metas.push(...options);
        } else {
            metas.push(options)
        }
    }

    /**
     * 获取属性的元数据
     * @param className 类名
     * @param propertyName 属性名
     * @returns 
     */
    public static getColumMetadata(className: string, propertyName: string) {
        const key = this.getClassMetadataKey(className);
        const metas = this.metadatasMap.get(key);
        if (!metas) {
            throw new Error(`this is no metadatas for Objet: ${className}`);
        }
        return metas.find(m => m.propertyName === propertyName);
    }

    /**
     * 获取类的所有属性的元数据
     * @param className 类名
     */
    public static getColumMetadatasOfClass(className: string) {
        const key = this.getClassMetadataKey(className);
        return this.metadatasMap.get(key);
    }
}

然后实现Column注解:

// column.decorator.ts
import MetadataStorageHelper from "../helper/metadata-storage.helper";

interface ColumnDecoratorOptions {
    name: string; // 数据库中字段名称
    isJonStr?: boolean, // 是否是JSON字符串,若数据库字段类型是字符串但存储的是JSON,可以设为true,则实体中属性字段类型可设为对象,在操作库时可自动来回转换
    toModel?: (property: any) => any, // 转换函数,默认为null,用于设置数据库值到实体中属性值的转换规则
    fromModel?: (colum: any) => any, // 转换函数,默认为null,用于设置实体中属性值到数据库中字段值的转换规则
}

export default function Column(options?: ColumnDecoratorOptions) {
    // target为实体对象,key为属性名 
    return (target: Object, key: string) => {
        const className = target.constructor.name;
        MetadataStorageHelper.addColumMetadata(className, {
            propertyName: key,
            dbName: options ? options.name: key,
            isJonStr: options ? options.isJonStr || false : false,
            toModel: options? options.toModel || null : null,
            fromModel: options? options.fromModel || null : null
        });
    }
}

ORM模块实现

ORM模块主要要实现以下两点:

  • 数据库连接
  • 实体mapper生成

其中数据库连接封装成一个单独的全局模块,后续直接引用即可。

数据库连接模块实现

数据库连接即为生成cassandra-driverClient对象,如下:

import { DynamicModule, Global, Inject, Logger, Module } from "@nestjs/common";
import { Client, DseClientOptions } from "cassandra-driver";

@Global()
@Module({})
export default class CassandraConnectionModule {

    /**
     * 模块配置入口
     * @param {DseClientOptions} options 数据库配置
     * @returns {DynamicModule}
     */
    static forRegister(options: DseClientOptions): DynamicModule {
        // 将数据库配置封装为提供者
        const DseClientOptionsProvider = {
            provide: 'DseClientOptions',
            useValue: options
        };
        // 客户端提供者
        const CassandraClientProvider = {
            provide: 'CassandraClient',
            useFactory: async() => {
                const client = new Client(options);
                await client.connect().then(() => {
                    Logger.log(`cassandra connected to contact point: ${options.contactPoints.join(',')}`, 'CassandraClient');
                }).catch(e => {
                    console.log(e);
                    Logger.log(`connect to cassandra failed with contact point: ${options.contactPoints.join(',')}`, 'CassandraClient');
                });
                return client;
            }
        }
        return {
            module: CassandraConnectionModule,
            providers: [DseClientOptionsProvider, CassandraClientProvider],
            exports: [DseClientOptionsProvider, CassandraClientProvider]
        }
    }
}

实体Mapper生成实现

实现一个辅助函数,接收实体类,读取类的元数据信息,生成相应的Mapper,每个Mapper都封装成一个提供者,以便可以注入,如下:

// provider.helper.ts
import { Client, mapping } from "cassandra-driver";
import MetadataStorageHelper from "./metadata-storage.helper";

/**
 * 获取实体类对应的mapper的注入名称
 * @param Entity 实体类
 * @returns {string}
 */
export function getMapperInjectName(Entity: ObjectConstructor) {
    return `${Reflect.getMetadata('keyspace', Entity)}_${Reflect.getMetadata('table', Entity)}Mapper`;
}

/**
 * 读取类的元数据,生成mapper提供者
 * @param entities 实体类
 */
export function getMapperProviders(Entities: ObjectConstructor[]) {
    return (Entities || []).map(Entity => ({
        provide: getMapperInjectName(Entity),
        inject: ['CassandraClient'], // 注入cassandra客户端
        useFactory: (cassandraClient: Client) => {
            const tableName = Reflect.getMetadata('table', Entity) as string;
            const keyspace = Reflect.getMetadata('keyspace', Entity) as string;
            // 拿出实体中所有属性的元数据
            const columnMetas = MetadataStorageHelper.getColumMetadatasOfClass(Entity.name);
            // 数据库中列名与实体中列名的对应关系
            const columns: {[key: string]: string} = {};
            for (let meta of columnMetas) {
                if (!meta.fromModel && !meta.isJonStr && !meta.toModel) {
                    columns[meta.dbName] = meta.propertyName;
                } else {
                    const colInfo = {
                        name: meta.propertyName,
                    }
                    if (meta.isJonStr) {
                        colInfo['fromModel'] = JSON.stringify;
                        colInfo['toModel'] = JSON.parse;
                    }
                    if (meta.fromModel) {
                        colInfo['fromModel'] = meta.fromModel;
                    }
                    if (meta.toModel) {
                        colInfo['toModel'] = meta.toModel;
                    }
                }
            }
            const mappingOptions = {
                models: {
                    [tableName]: {
                        keyspace,
                        tables: [tableName],
                        columns, // 映射规则优先于mappings
                        mappings: new mapping.UnderscoreCqlToCamelCaseMappings(),  // 字段映射方式,数据库中下划线的命名方式会映射为对象中的驼峰命名方式
                    }
                }
            }
            // 生成mapper对象
            return new mapping.Mapper(cassandraClient, mappingOptions);
        }
    }));
}

主模块实现

将数据连接与Mapper生成提供者组合起来,实现主模块,也即入口模块,如下:

// cassandra-orm.module.ts
import { DynamicModule, Module } from "@nestjs/common";
import { DseClientOptions } from "cassandra-driver";

import { getMapperProviders } from "../helper/providers.helper";
import CassandraConnectionModule from "./cassandra-connection.module";

@Module({})
export default class CassandraOrmModule {
    /**
     * 
     * @param {DseClientOptions} options 数据库配置
     * @returns {DynamicModule}
     */
    static forRoot(options: DseClientOptions): DynamicModule {
        return {
            module: CassandraOrmModule,
            imports: [CassandraConnectionModule.forRegister(options)]
        };
    }

    /**
     * 
     * @param {any} Entities 实体类
     * @returns {DynamicModule}
     */
    static forFeature(Entities: any[]): DynamicModule {
        // 获取所有实体的mapper提供者
        const mapperProviders = getMapperProviders(Entities);
        return {
            module: CassandraOrmModule,
            providers: mapperProviders,
            exports: mapperProviders  // 将提供者导出
        }
    }
}

Mapper注入注解实现

现在功能基本完成,但还需要一个注解,以便在其它类(如Service)中注入生成的mapper,如下:

// inject-mapper.decorator.ts
import { Inject } from "@nestjs/common";

import { getMapperInjectName } from "../helper/providers.helper";

export default function InjectMapper(Entity: any) {
    // 根据名称注入
    return Inject(getMapperInjectName(Entity));
}

同时,也写一个注入cassandra连接客户端的注解,以便某些场景需直接注入客户端:

// inject-client.decorator.ts
import { Inject } from "@nestjs/common";

export default function InjectClient() {
    // 根据名称注入
    return Inject('CassandraClient');
}

基本CURD类封装

为了方便,可以封装一个基本的CURD服务基类,实现基本的CURD方法,创建Service时直接继承基类即可获取这些方法,如下:

// base-service.curd.ts

import { Client, mapping, QueryOptions, types } from "cassandra-driver";
// cassandra-driver内部实现的cql语句生成函数,默认没有导出,因此需要完整路径导入
import * as QueryGenerator from "cassandra-driver/lib/mapping/query-generator"
import * as DocInfoAdapter from "cassandra-driver/lib/mapping/doc-info-adapter";

import MetadataStorageHelper, { ColumnMetadataOptions } from "../helper/metadata-storage.helper";

type EntityConditionOptions<T> = {[key in keyof T]?: T[key] | mapping.q.QueryOperator};

/**
 * 服务基类
 */
export default class BaseService<T> {
    // 类型追加_modelMappingInfos是因为在使用cassandra-driver实现的转换函数将实体描述转为原始cql语句时,映射
    // 信息须作为一个参数传入,映射信息是作为私有属性存在在mapper里的,ts层面无法展现,但运行时由于编译为js,没有私有属性概念,
    // 可以直接获取,此处也是个妥协。
    private readonly _mapper: mapping.Mapper & {_modelMappingInfos: Map<string, any>};
    private readonly _client: Client;
    private readonly _Entity: any;

    protected readonly tableName: string;
    protected readonly keyspaceName: string;
    protected readonly modelMapper: mapping.ModelMapper<T>;
    protected readonly columnMetas: ColumnMetadataOptions[];
    constructor(client:  Client, mapper: mapping.Mapper, Entity: any) {
        this._client = client;
        this._mapper = mapper as any;
        this._Entity = Entity;

        this.keyspaceName = Reflect.getMetadata('keyspace', Entity) as string;
        this.tableName = Reflect.getMetadata('table', Entity) as string;
        this.columnMetas = MetadataStorageHelper.getColumMetadatasOfClass(Entity.name);
        this.modelMapper = mapper.forModel(this.tableName);
    }

    /**
     * 
     * @param row 数据库记录,将数据库记录转为实体
     * @returns {T}
     */
    protected row2entity(row: Iterable<{[key: string]: any}> | types.Row): T {
        const entity = new this._Entity();
        for (let meta of this.columnMetas) {
            entity[meta.propertyName] = row[meta.dbName];
        }
        return entity;
    }

    async saveOne(entity: T, docInfo?: mapping.InsertDocInfo, execOptions?: mapping.MappingExecutionOptions) {
        await this.modelMapper.insert(entity, docInfo, execOptions);
    }

    async saveMany(entities: T[], docInfo?: mapping.InsertDocInfo, execOptions?: mapping.MappingExecutionOptions) {
        // 做批量时cassandra有大小限制,在配置文件的batch_size_fail_threshold_in_kb项中可以配置,默认50kb,超过设置大小会导致批量配置失败
        const batches = entities.map(entity => this.modelMapper.batching.insert(entity, docInfo));
        await this._mapper.batch(batches, execOptions);
    }

    /**
     * 查询所有,需要注意的是由于cassandra-driver中mapper的机制,该方法不是返回的真正的全表数据,而是默认返回前5000条,
     * 表超过5000条记录时若要返回真正的全部数据,用findRealAll方法
     * @param docInfo 文档查询配置
     * @param execOptions 执行配置
     * @returns {T[]}
     */
    async findAll(docInfo?: mapping.FindDocInfo, execOptions?: mapping.MappingExecutionOptions): Promise<T[]> {
        return (await this.modelMapper.findAll(docInfo, execOptions)).toArray();
    }

    /**
     * 条件查询,需要注意的是由于cassandra-driver中mapper的机制,数据查询有5000条的数据限制,查询结果超过5000条只返回前5000条,
     * 超过5000条记录时若要返回真正的全部数据,用findRealMany方法
     * @param options 查询条件
     * @param docInfo 文档查询配置
     * @param execOptions 执行配置
     * @returns 
     */
    async findMany(conditions: EntityConditionOptions<T>, docInfo?: mapping.FindDocInfo, execOptions?: mapping.MappingExecutionOptions)  {
        return (await this.modelMapper.find(conditions, docInfo, execOptions)).toArray();
    }

    async findOne(conditions: EntityConditionOptions<T>, docInfo?: mapping.FindDocInfo, execOptions?: mapping.MappingExecutionOptions) {
        const di: mapping.FindDocInfo = docInfo || {limit: 1};
        if (!di.limit) {
            di.limit = 1;
        }
        const res = await this.modelMapper.find(conditions, di, execOptions);
        return res.toArray()[0] || null;
    }

    private getDbNameOfProperty(propertyName: string) {
        const meta = this.columnMetas.find(m => m.propertyName === propertyName);
        if (!meta) {
            throw new Error(`field ${propertyName} is not exist in ${this._Entity.name} `);
        }
        return meta.dbName;
    }


    async findRealMany(conditions: EntityConditionOptions<T>, docInfo?: mapping.FindDocInfo, options?: QueryOptions): Promise<T[]> {  
        const cp =  this.makeQueryCqlAndParams(conditions, docInfo);
        return this.findThroughEachRow(cp.cql, cp.params, options);
    }

    async findRealAll(docInfo?: mapping.FindDocInfo, options?: QueryOptions): Promise<T[]> {
        const cp =  this.makeQueryCqlAndParams(null, docInfo);
        return this.findThroughEachRow(cp.cql, cp.params, options);
    }

    async update(values: EntityConditionOptions<T>, docInfo?: mapping.UpdateDocInfo, execOptions?: mapping.MappingExecutionOptions) {
        const result = await this.modelMapper.update(values, docInfo, execOptions);
        return result.toArray();
    }

    async updateMany(values: Array<{value: EntityConditionOptions<T>, docInfo?: mapping.UpdateDocInfo}>, execOptions?: mapping.MappingExecutionOptions) {
        const batches = values.map(v => this.modelMapper.batching.update(v.value, v.docInfo || null));
        const result = await this._mapper.batch(batches, execOptions);
        return result.toArray();
    }

    async remove(values: EntityConditionOptions<T>, docInfo?: mapping.UpdateDocInfo, execOptions?: mapping.MappingExecutionOptions) {
        const result = await this.modelMapper.remove(values, docInfo, execOptions);
        return result.toArray();
    }

    async removeMany(values: Array<{value:  EntityConditionOptions<T>, docInfo?: mapping.UpdateDocInfo}>, execOptions?: mapping.MappingExecutionOptions) {
        const batches = values.map(v => this.modelMapper.batching.remove(v.value, v.docInfo || null));
        const result = await this._mapper.batch(batches, execOptions);
        return result.toArray();
    }

    /**
     * 用eachRow查询所有记录,由于此方法直接执行的cql语句,因此在使用时需要进行属性名到数据库记录名的转换
     * @param {string} cql cql语句
     * @param {any} params 参数
     * @param {QueryOptions} options 查询参数
     * @returns {Promise<T[]>}
     */
    private findThroughEachRow(cql: string, params: any, options?: QueryOptions): Promise<T[]> {
        return new Promise((resolve, reject) => {
            const results: T[] = [];
            this._client.eachRow(cql, params || null, options, (_, row) => {
                results.push(this.row2entity(row));
            }, (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    if (result && result.nextPage) {
                        result.nextPage();
                    } else {
                        resolve(results);
                    }
                }
            });
        })
    }
    
    /**
     * 根据实体描述生成原始cql查询语句与参数
     * 转化过程采用cassandra-driver内部实现的转换函数
     * @param conditions 查询条件
     * @param docInfo 文档查询配置
     * @returns 
     */
     private makeQueryCqlAndParams(conditions?: EntityConditionOptions<T>, docInfo?: mapping.FindDocInfo): {cql: string, params: any[]} {
        const mappingInfo = this._mapper._modelMappingInfos.get(this.tableName);
        let propertiesInfo: Array<{columnName: string, value: any}> = [];
        if (conditions) {
            propertiesInfo = DocInfoAdapter.getPropertiesInfo(Object.keys(conditions), docInfo, conditions, mappingInfo);
        }
        const fieldsInfo: string[] = [];
        if (docInfo && docInfo.fields) {
            for (let field of docInfo.fields) {
                fieldsInfo.push(this.getDbNameOfProperty(field));
            }
        }

        const orders: string[][] = [];
        if (docInfo && docInfo.orderBy) {
            for (let key of Object.keys(docInfo.orderBy)) {
                orders.push([this.getDbNameOfProperty(key), docInfo.orderBy[key]]);
            }
        }
        const limit = docInfo && docInfo.limit ? docInfo.limit : null;
        // const p = QueryGenerator.selectParamsGetter(propertiesInfo, limit);
        // const cql = QueryGenerator.getSelect(this.tableName, this.keyspaceName, propertiesInfo, fieldsInfo, orders, limit);
        // console.log(p)
        return {
            cql: QueryGenerator.getSelect(this.tableName, this.keyspaceName, propertiesInfo, fieldsInfo, orders, limit),
            params: QueryGenerator.selectParamsGetter(propertiesInfo, limit)(conditions, docInfo, mappingInfo)
        };
    }
}

需要注意的是:

  • 在使用mapperfind*查询函数时,有5000的条数限制,即使findAll也只返回前5000条,因此,为了实现大于5000的数据查询,封装了一个findRealManyfindRealAll方法,采用eachRow进行查询,但eachRow接收的是原始的cql语句与参数,因此,在使用时需要先进行一步实体属性条件到数据库字段名称的转换并依据条件生成对应的原始cql语句,由于cassandra-driver内部有cql语句生成的实现(lib/mapping中),因此,直接使用即可,但默认没有导出,需要使用完整路径导入,不方便的一点是文件是js的且没有相应的.d.ts描述文件,缺少了类型提示。
  • 可能令人迷惑的一点是,update操作的第一个参数同时包含了条件与更新值,cassandra-driver会筛选主键字段作为条件(where),其它字段作为参数更新,其第二个参数中的when只是对应于cassandraiffields限制了要更新的字段,若设置了第二个参数的fields属性,则只会更新设置的属性,且fields应包含需更新的字段以及所有主键字段。另外,cassandra在执行更新操作时必须指定所有主键,不能只指定部分主键。原因在于若存在多个集群键但更新时只指定了部分集群键,则cassandra无法判断要更新的数据具体存在在哪台机器上,必须扫描整个集群,这样性能会很低。
  • Delete操作与update类似,删除时必须指定所有主键,但与更新操作不同的是,cassandra本身并没有限制必须指定所有主键,这是在cassandra-driver中限制的,应该也是为了效率考虑。

导出

可以将所有用户所需的函数或对象在index.ts中导出,这样使用时直接通过index.ts导入即可,不用通过具体的文件夹导入:

// index.ts
import 'reflect-metadata';

export { default as CassandraOrmModule } from './module/cassandra-orm.module';

export { default as BaseService } from './curd/base-service.curd';

export { default as InjectMapper } from './decorator/Inject-mapper.decorator';
export { default as Column } from './decorator/column.decorator';
export { default as InjectClient} from './decorator/inject-client.decorator';
export { default as Entity } from './decorator/entity.decorator';

测试

创建test目录,对模块进行测试,创建一个数据库表格:

CREATE TABLE wt_test.device (
    serial_number varchar,
    create_time timestamp,
    version varchar,
    is_online boolean,
    PRIMARY KEY ((serial_number), create_time)
) WITH CLUSTERING ORDER BY (create_time DESC);

首先创建对应实体:

// device.entity.ts
import { Column, Entity } from "../lib";

@Entity({
    keyspace: 'wt_test',
    table: 'device'
})
export default class Device {
    @Column({name: 'serial_number'})
    serialNumber: string;

    @Column({name: 'create_time'})
    createTime: Date;

    @Column()
    version: string;

    @Column({name: 'is_online'})
    isOnline: boolean;
}

编写service服务类:

// device.service.ts
import { Injectable } from "@nestjs/common";
import { Client } from "cassandra-driver";

import { InjectClient, InjectMapper, BaseService } from "../lib";
import Device from "./device.entity";

@Injectable()
export default class DeviceService extends BaseService<Device> { // 继承服务基类
    constructor(
        @InjectMapper(Device) private readonly mapper,
        @InjectClient() client: Client
    ) {
        super(client, mapper, Device); // 父类构造
    }
}

然后写一个controller,验证基本方法:

// device.controller.ts
import { Controller, Get } from "@nestjs/common";
import { mapping } from 'cassandra-driver';
import { assert } from "console";

import Device from "./device.entity";
import DeviceService from "./device.service";

const q = mapping.q;

@Controller('device')
export default class DeviceController {
    constructor(
        private readonly deviceService: DeviceService
    ){}

    @Get('doSomthing')
    async doSomething() {
        const device1 = new Device();
        device1.serialNumber = 'dev1';
        device1.createTime = new Date();
        device1.isOnline = false;
        device1.version = "v1";

        const device2 = new Device();
        device2.serialNumber = 'dev2';
        device2.createTime = new Date();
        device2.isOnline = true;
        device2.version = "v1";

        console.log('----------saveMany-----------');
        await this.deviceService.saveMany([device1, device2]);

        console.log('----------findAll-----------');
        const allDevices = await this.deviceService.findAll();
        assert(allDevices.length == 2, 'assert 1');

        console.log('----------findRealAll-----------');
        const allRealDevices = await this.deviceService.findRealAll();
        assert(allRealDevices.length == 2, 'assert 2');

        console.log('----------findMany-----------');
        const devs1 = await this.deviceService.findMany({serialNumber: 'dev1', createTime: q.lte(device1.createTime)});
        assert(devs1.length == 1 && devs1[0].serialNumber == 'dev1', 'assert 3');

        console.log('----------findRealMany-----------');
        const devs2 = await this.deviceService.findRealMany({serialNumber: 'dev2', createTime: q.gte(device2.createTime)});
        assert(devs2.length == 1 && devs2[0].serialNumber == 'dev2', 'assert 4');

        console.log('----------update-----------');
        const dev2 = devs2[0];
        dev2.version = 'v2';
        dev2.isOnline = !dev2.isOnline;
        await this.deviceService.update(dev2, {fields: ['serialNumber', 'createTime', 'version']});
        const v2New = await this.deviceService.findOne({serialNumber: 'dev2'});
        assert(v2New.version == 'v2' && v2New.isOnline == true, 'assert 5');

        const dev1 = devs1[0];
        dev1.isOnline = false;
        await this.deviceService.updateMany([{
            value: dev1,
            docInfo: {fields: ['serialNumber', 'createTime', 'isOnline']} 
        }]);
        const dev1New = await this.deviceService.findOne({serialNumber: 'dev1'});
        assert(dev1New.isOnline == false);

        console.log('----------remove1-----------');
        await this.deviceService.remove({serialNumber: 'dev1', createTime: q.gte(device1.createTime)});
        assert((await this.deviceService.findAll()).length == 1, 'assert 6');

        console.log('----------remove2-----------');
        await this.deviceService.removeMany([{value: {serialNumber: 'dev2', createTime: q.gte(device1.createTime)}}]);
        assert((await this.deviceService.findAll()).length == 0, 'assert 7');
        return { msg: "all finished ..."};
    }
}

编写模块,配置cassandra并注册服务类与控制类:

// orm-test.module.ts
import { Module } from "@nestjs/common";
import { auth } from "cassandra-driver";

import { CassandraOrmModule } from "../lib";
import DeviceController from "./device.controller";
import Device from "./device.entity";
import DeviceService from "./device.service";

@Module({
    imports: [
        CassandraOrmModule.forRoot({
            contactPoints: ['localhost'],
            authProvider: new auth.PlainTextAuthProvider('username', 'password'),
            localDataCenter: 'datacenter1'
        }),
        CassandraOrmModule.forFeature([
            Device
        ])
    ],
    controllers: [DeviceController],
    providers: [DeviceService]
})
export default class OrmTestModule {}

最后,编写入口函数,启动接口服务,需要安装express支持:

npm i @nestjs/platform-express --save-dev

入口函数如下:

// test.server.ts
import 'reflect-metadata'; // 引入元数据支持
import { NestFactory } from "@nestjs/core";
import { NestExpressApplication } from "@nestjs/platform-express";
import OrmTestModule from "./orm-test.module";

async function bootstrap() {
    const port = 30000;
    const app = await NestFactory.create<NestExpressApplication>(OrmTestModule);
    await app.listen(port, () => console.log(`server listening on port :${port}`));
}
bootstrap();

启动:

npx ts-node test\test.server.ts

浏览器访问链接localhost:30000/device/doSomthing测试。

这里查看完整示例。