/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.table;

import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.AbstractFileStoreTable;
import org.apache.flink.table.store.table.sink.MemoryTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class ChangelogValueCountFileStoreTable
extends AbstractFileStoreTable {
    private static final long serialVersionUID = 1L;
    private final KeyValueFileStore store;

    ChangelogValueCountFileStoreTable(Path path, SchemaManager schemaManager, TableSchema tableSchema) {
        super(path, tableSchema);
        RowType countType = RowType.of((LogicalType[])new LogicalType[]{new BigIntType(false)}, (String[])new String[]{"_VALUE_COUNT"});
        ValueCountMergeFunction mergeFunction = new ValueCountMergeFunction();
        this.store = new KeyValueFileStore(schemaManager, tableSchema.id(), new CoreOptions(tableSchema.options()), tableSchema.logicalPartitionType(), tableSchema.logicalBucketKeyType(), tableSchema.logicalRowType(), countType, mergeFunction);
    }

    @Override
    public TableScan newScan() {
        final KeyValueFileStoreScan scan = this.store.newScan();
        return new TableScan(scan, this.tableSchema, this.store.pathFactory()){

            @Override
            protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                return new MergeTreeSplitGenerator(ChangelogValueCountFileStoreTable.this.store.newKeyComparator(), ChangelogValueCountFileStoreTable.this.store.options().splitTargetSize(), ChangelogValueCountFileStoreTable.this.store.options().splitOpenFileCost());
            }

            @Override
            protected void withNonPartitionFilter(Predicate predicate) {
                scan.withKeyFilter(predicate);
            }
        };
    }

    @Override
    public TableRead newRead() {
        return new KeyValueTableRead(this.store.newRead()){

            @Override
            public TableRead withFilter(Predicate predicate) {
                this.read.withFilter(predicate);
                return this;
            }

            @Override
            public TableRead withProjection(int[][] projection) {
                this.read.withKeyProjection(projection);
                return this;
            }

            @Override
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                return new ValueCountRowDataRecordIterator(kvRecordIterator);
            }
        };
    }

    @Override
    public TableWrite newWrite() {
        SinkRecordConverter recordConverter = new SinkRecordConverter(this.store.options().bucket(), this.tableSchema);
        return new MemoryTableWrite<KeyValue>((FileStoreWrite)this.store.newWrite(), recordConverter, this.store.options()){
            private final KeyValue kv;
            {
                this.kv = new KeyValue();
            }

            @Override
            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer) throws Exception {
                switch (record.row().getRowKind()) {
                    case INSERT: 
                    case UPDATE_AFTER: {
                        this.kv.replace(record.row(), RowKind.INSERT, (RowData)GenericRowData.of((Object[])new Object[]{1L}));
                        break;
                    }
                    case UPDATE_BEFORE: 
                    case DELETE: {
                        this.kv.replace(record.row(), RowKind.INSERT, (RowData)GenericRowData.of((Object[])new Object[]{-1L}));
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unknown row kind " + record.row().getRowKind());
                    }
                }
                writer.write(this.kv);
            }
        };
    }

    public KeyValueFileStore store() {
        return this.store;
    }
}

