Is there any way to pass Observable<String> into AbstractInputStreamContent?

Alex Kokorin 03/21/2018. 1 answers, 35 views
arrays google-drive-sdk google-drive-sdk rx-java rx-java2 google-drive-realtime-api google-drive-realtime-api

I'm working on uploading a text file to Google Drive

ByteArrayContent content = new ByteArrayContent("text/csv", fileContent.getBytes(Charset.forName("UTF-8")));
Drive.Files.Insert request = drive.files().insert(file, content);

where type(fileContent) = String

I'd like to refactor and change type of fileContent to Observable<String>, is there any nice workaround to pass it to that insert() function (which takes AbstractInputStreamContent as a second argument)?

Thanks

1 Answers


akarnokd 03/22/2018.

Here is a general Flowable -> InputStream bridge you can delegate to:

import java.io.*;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;

import io.reactivex.FlowableSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

public final class FlowableStringInputStream {

    private FlowableStringInputStream() {
        throw new IllegalStateException("No instances!");
    }

    public static InputStream createInputStream(
            Publisher<String> source, Charset charset) {
        StringInputStream parent = new StringInputStream(charset);
        source.subscribe(parent);
        return parent;
    }

    static final class StringInputStream extends InputStream
    implements FlowableSubscriber<String> {

        final AtomicReference<Subscription> upstream;

        final Charset charset;

        volatile byte[] bytes;

        int index;

        volatile boolean done;
        Throwable error;

        StringInputStream(Charset charset) {
            this.charset = charset;
            upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1);
            }
        }

        @Override
        public void onNext(String t) {
            bytes = t.getBytes(charset);
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public void onError(Throwable t) {
            error = t;
            done = true;
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public void onComplete() {
            done = true;
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public int read() throws IOException {
            for (;;) {
                byte[] a = awaitBufferIfNecessary();
                if (a == null) {
                    Throwable ex = error;
                    if (ex != null) {
                        if (ex instanceof IOException) {
                            throw (IOException)ex;
                        }
                        throw new IOException(ex);
                    }
                    return -1;
                }
                int idx = index;
                if (idx == a.length) {
                    index = 0;
                    bytes = null;
                    upstream.get().request(1);
                } else {
                    int result = a[idx] & 0xFF;
                    index = idx + 1;
                    return result;
                }
            }
        }

        byte[] awaitBufferIfNecessary() throws IOException {
            byte[] a = bytes;
            if (a == null) {
                synchronized (this) {
                    for (;;) {
                        boolean d = done;
                        a = bytes;
                        if (a != null) {
                            break;
                        }
                        if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
                            break;
                        }
                        try {
                            wait();
                        } catch (InterruptedException ex) {
                            if (upstream.get() != SubscriptionHelper.CANCELLED) {
                                InterruptedIOException exc = new InterruptedIOException();
                                exc.initCause(ex);
                                throw exc;
                            }
                            break;
                        }
                    } 
                }
            }
            return a;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
                throw new IndexOutOfBoundsException(
                    "b.length=" + b.length + ", off=" + off + ", len=" + len);
            }
            for (;;) {
                byte[] a = awaitBufferIfNecessary();
                if (a == null) {
                    Throwable ex = error;
                    if (ex != null) {
                        if (ex instanceof IOException) {
                            throw (IOException)ex;
                        }
                        throw new IOException(ex);
                    }
                    return -1;
                }
                int idx = index;
                if (idx == a.length) {
                    index = 0;
                    bytes = null;
                    upstream.get().request(1);
                } else {
                    int r = 0;
                    while (idx < a.length && len > 0) {
                        b[off] = a[idx];
                        idx++;
                        off++;
                        r++;
                        len--;
                    }
                    index = idx;
                    return r;
                }
            }
        }

        @Override
        public int available() throws IOException {
            byte[] a = bytes;
            int idx = index;
            return a != null ? Math.max(0, a.length - idx) : 0;
        }

        @Override
        public void close() throws IOException {
            SubscriptionHelper.cancel(upstream);
            synchronized (this) {
                notifyAll();
            }
        }
    }
}

Usage:

@Test(timeout = 10000)
public void async() throws Exception {
    AtomicInteger calls = new AtomicInteger();

    Flowable<String> f = Flowable.range(100, 10).map(Object::toString)
            .doOnCancel(() -> calls.incrementAndGet())
            .subscribeOn(Schedulers.computation())
            .delay(10, TimeUnit.MILLISECONDS);

    try (InputStream is = FlowableStringInputStream.createInputStream(f, utf8)) {
        assertEquals('1', is.read());
        assertEquals('0', is.read());
        assertEquals('0', is.read());

        byte[] buf = new byte[3];
        assertEquals(3, is.read(buf));

        assertArrayEquals("101".getBytes(utf8), buf);
    }

    assertEquals(1, calls.get());
}

Related questions

Hot questions

Language

Popular Tags